blob: 326c3720cc7a599a3d37f153b042006a83b216ed [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001package sarama
2
3type offsetRequestBlock struct {
4 time int64
5 maxOffsets int32 // Only used in version 0
6}
7
8func (b *offsetRequestBlock) encode(pe packetEncoder, version int16) error {
9 pe.putInt64(int64(b.time))
10 if version == 0 {
11 pe.putInt32(b.maxOffsets)
12 }
13
14 return nil
15}
16
17func (b *offsetRequestBlock) decode(pd packetDecoder, version int16) (err error) {
18 if b.time, err = pd.getInt64(); err != nil {
19 return err
20 }
21 if version == 0 {
22 if b.maxOffsets, err = pd.getInt32(); err != nil {
23 return err
24 }
25 }
26 return nil
27}
28
29type OffsetRequest struct {
30 Version int16
31 replicaID int32
32 isReplicaIDSet bool
33 blocks map[string]map[int32]*offsetRequestBlock
34}
35
36func (r *OffsetRequest) encode(pe packetEncoder) error {
37 if r.isReplicaIDSet {
38 pe.putInt32(r.replicaID)
39 } else {
40 // default replica ID is always -1 for clients
41 pe.putInt32(-1)
42 }
43
44 err := pe.putArrayLength(len(r.blocks))
45 if err != nil {
46 return err
47 }
48 for topic, partitions := range r.blocks {
49 err = pe.putString(topic)
50 if err != nil {
51 return err
52 }
53 err = pe.putArrayLength(len(partitions))
54 if err != nil {
55 return err
56 }
57 for partition, block := range partitions {
58 pe.putInt32(partition)
59 if err = block.encode(pe, r.Version); err != nil {
60 return err
61 }
62 }
63 }
64 return nil
65}
66
67func (r *OffsetRequest) decode(pd packetDecoder, version int16) error {
68 r.Version = version
69
70 replicaID, err := pd.getInt32()
71 if err != nil {
72 return err
73 }
74 if replicaID >= 0 {
75 r.SetReplicaID(replicaID)
76 }
77
78 blockCount, err := pd.getArrayLength()
79 if err != nil {
80 return err
81 }
82 if blockCount == 0 {
83 return nil
84 }
85 r.blocks = make(map[string]map[int32]*offsetRequestBlock)
86 for i := 0; i < blockCount; i++ {
87 topic, err := pd.getString()
88 if err != nil {
89 return err
90 }
91 partitionCount, err := pd.getArrayLength()
92 if err != nil {
93 return err
94 }
95 r.blocks[topic] = make(map[int32]*offsetRequestBlock)
96 for j := 0; j < partitionCount; j++ {
97 partition, err := pd.getInt32()
98 if err != nil {
99 return err
100 }
101 block := &offsetRequestBlock{}
102 if err := block.decode(pd, version); err != nil {
103 return err
104 }
105 r.blocks[topic][partition] = block
106 }
107 }
108 return nil
109}
110
111func (r *OffsetRequest) key() int16 {
112 return 2
113}
114
115func (r *OffsetRequest) version() int16 {
116 return r.Version
117}
118
119func (r *OffsetRequest) requiredVersion() KafkaVersion {
120 switch r.Version {
121 case 1:
122 return V0_10_1_0
123 default:
124 return MinVersion
125 }
126}
127
128func (r *OffsetRequest) SetReplicaID(id int32) {
129 r.replicaID = id
130 r.isReplicaIDSet = true
131}
132
133func (r *OffsetRequest) ReplicaID() int32 {
134 if r.isReplicaIDSet {
135 return r.replicaID
136 }
137 return -1
138}
139
140func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, maxOffsets int32) {
141 if r.blocks == nil {
142 r.blocks = make(map[string]map[int32]*offsetRequestBlock)
143 }
144
145 if r.blocks[topic] == nil {
146 r.blocks[topic] = make(map[int32]*offsetRequestBlock)
147 }
148
149 tmp := new(offsetRequestBlock)
150 tmp.time = time
151 if r.Version == 0 {
152 tmp.maxOffsets = maxOffsets
153 }
154
155 r.blocks[topic][partitionID] = tmp
156}