blob: 462ab8afbb8ef7f213d1955861685bafea851d90 [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001package sarama
2
3type fetchRequestBlock struct {
4 fetchOffset int64
5 maxBytes int32
6}
7
8func (b *fetchRequestBlock) encode(pe packetEncoder) error {
9 pe.putInt64(b.fetchOffset)
10 pe.putInt32(b.maxBytes)
11 return nil
12}
13
14func (b *fetchRequestBlock) decode(pd packetDecoder) (err error) {
15 if b.fetchOffset, err = pd.getInt64(); err != nil {
16 return err
17 }
18 if b.maxBytes, err = pd.getInt32(); err != nil {
19 return err
20 }
21 return nil
22}
23
24// FetchRequest (API key 1) will fetch Kafka messages. Version 3 introduced the MaxBytes field. See
25// https://issues.apache.org/jira/browse/KAFKA-2063 for a discussion of the issues leading up to that. The KIP is at
26// https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes
27type FetchRequest struct {
28 MaxWaitTime int32
29 MinBytes int32
30 MaxBytes int32
31 Version int16
32 Isolation IsolationLevel
33 blocks map[string]map[int32]*fetchRequestBlock
34}
35
36type IsolationLevel int8
37
38const (
39 ReadUncommitted IsolationLevel = 0
40 ReadCommitted IsolationLevel = 1
41)
42
43func (r *FetchRequest) encode(pe packetEncoder) (err error) {
44 pe.putInt32(-1) // replica ID is always -1 for clients
45 pe.putInt32(r.MaxWaitTime)
46 pe.putInt32(r.MinBytes)
47 if r.Version >= 3 {
48 pe.putInt32(r.MaxBytes)
49 }
50 if r.Version >= 4 {
51 pe.putInt8(int8(r.Isolation))
52 }
53 err = pe.putArrayLength(len(r.blocks))
54 if err != nil {
55 return err
56 }
57 for topic, blocks := range r.blocks {
58 err = pe.putString(topic)
59 if err != nil {
60 return err
61 }
62 err = pe.putArrayLength(len(blocks))
63 if err != nil {
64 return err
65 }
66 for partition, block := range blocks {
67 pe.putInt32(partition)
68 err = block.encode(pe)
69 if err != nil {
70 return err
71 }
72 }
73 }
74 return nil
75}
76
77func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
78 r.Version = version
79 if _, err = pd.getInt32(); err != nil {
80 return err
81 }
82 if r.MaxWaitTime, err = pd.getInt32(); err != nil {
83 return err
84 }
85 if r.MinBytes, err = pd.getInt32(); err != nil {
86 return err
87 }
88 if r.Version >= 3 {
89 if r.MaxBytes, err = pd.getInt32(); err != nil {
90 return err
91 }
92 }
93 if r.Version >= 4 {
94 isolation, err := pd.getInt8()
95 if err != nil {
96 return err
97 }
98 r.Isolation = IsolationLevel(isolation)
99 }
100 topicCount, err := pd.getArrayLength()
101 if err != nil {
102 return err
103 }
104 if topicCount == 0 {
105 return nil
106 }
107 r.blocks = make(map[string]map[int32]*fetchRequestBlock)
108 for i := 0; i < topicCount; i++ {
109 topic, err := pd.getString()
110 if err != nil {
111 return err
112 }
113 partitionCount, err := pd.getArrayLength()
114 if err != nil {
115 return err
116 }
117 r.blocks[topic] = make(map[int32]*fetchRequestBlock)
118 for j := 0; j < partitionCount; j++ {
119 partition, err := pd.getInt32()
120 if err != nil {
121 return err
122 }
123 fetchBlock := &fetchRequestBlock{}
124 if err = fetchBlock.decode(pd); err != nil {
125 return err
126 }
127 r.blocks[topic][partition] = fetchBlock
128 }
129 }
130 return nil
131}
132
133func (r *FetchRequest) key() int16 {
134 return 1
135}
136
137func (r *FetchRequest) version() int16 {
138 return r.Version
139}
140
141func (r *FetchRequest) requiredVersion() KafkaVersion {
142 switch r.Version {
143 case 1:
144 return V0_9_0_0
145 case 2:
146 return V0_10_0_0
147 case 3:
148 return V0_10_1_0
149 case 4:
150 return V0_11_0_0
151 default:
152 return MinVersion
153 }
154}
155
156func (r *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int64, maxBytes int32) {
157 if r.blocks == nil {
158 r.blocks = make(map[string]map[int32]*fetchRequestBlock)
159 }
160
161 if r.blocks[topic] == nil {
162 r.blocks[topic] = make(map[int32]*fetchRequestBlock)
163 }
164
165 tmp := new(fetchRequestBlock)
166 tmp.maxBytes = maxBytes
167 tmp.fetchOffset = fetchOffset
168
169 r.blocks[topic][partitionID] = tmp
170}