blob: edf978790c9fca800e022444404704a7957ba19b [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3import (
4 "fmt"
5 "time"
6)
7
khenaidood948f772021-08-11 17:49:24 -04008// Protocol, http://kafka.apache.org/protocol.html
9// v1
10// v2 = v3 = v4
11// v5 = v6 = v7
12// Produce Response (Version: 7) => [responses] throttle_time_ms
13// responses => topic [partition_responses]
14// topic => STRING
15// partition_responses => partition error_code base_offset log_append_time log_start_offset
16// partition => INT32
17// error_code => INT16
18// base_offset => INT64
19// log_append_time => INT64
20// log_start_offset => INT64
21// throttle_time_ms => INT32
22
23// partition_responses in protocol
khenaidooac637102019-01-14 15:44:34 -050024type ProduceResponseBlock struct {
khenaidood948f772021-08-11 17:49:24 -040025 Err KError // v0, error_code
26 Offset int64 // v0, base_offset
27 Timestamp time.Time // v2, log_append_time, and the broker is configured with `LogAppendTime`
28 StartOffset int64 // v5, log_start_offset
khenaidooac637102019-01-14 15:44:34 -050029}
30
31func (b *ProduceResponseBlock) decode(pd packetDecoder, version int16) (err error) {
32 tmp, err := pd.getInt16()
33 if err != nil {
34 return err
35 }
36 b.Err = KError(tmp)
37
38 b.Offset, err = pd.getInt64()
39 if err != nil {
40 return err
41 }
42
43 if version >= 2 {
44 if millis, err := pd.getInt64(); err != nil {
45 return err
46 } else if millis != -1 {
47 b.Timestamp = time.Unix(millis/1000, (millis%1000)*int64(time.Millisecond))
48 }
49 }
50
khenaidood948f772021-08-11 17:49:24 -040051 if version >= 5 {
52 b.StartOffset, err = pd.getInt64()
53 if err != nil {
54 return err
55 }
56 }
57
khenaidooac637102019-01-14 15:44:34 -050058 return nil
59}
60
61func (b *ProduceResponseBlock) encode(pe packetEncoder, version int16) (err error) {
62 pe.putInt16(int16(b.Err))
63 pe.putInt64(b.Offset)
64
65 if version >= 2 {
66 timestamp := int64(-1)
67 if !b.Timestamp.Before(time.Unix(0, 0)) {
68 timestamp = b.Timestamp.UnixNano() / int64(time.Millisecond)
69 } else if !b.Timestamp.IsZero() {
70 return PacketEncodingError{fmt.Sprintf("invalid timestamp (%v)", b.Timestamp)}
71 }
72 pe.putInt64(timestamp)
73 }
74
khenaidood948f772021-08-11 17:49:24 -040075 if version >= 5 {
76 pe.putInt64(b.StartOffset)
77 }
78
khenaidooac637102019-01-14 15:44:34 -050079 return nil
80}
81
82type ProduceResponse struct {
khenaidood948f772021-08-11 17:49:24 -040083 Blocks map[string]map[int32]*ProduceResponseBlock // v0, responses
khenaidooac637102019-01-14 15:44:34 -050084 Version int16
khenaidood948f772021-08-11 17:49:24 -040085 ThrottleTime time.Duration // v1, throttle_time_ms
khenaidooac637102019-01-14 15:44:34 -050086}
87
88func (r *ProduceResponse) decode(pd packetDecoder, version int16) (err error) {
89 r.Version = version
90
91 numTopics, err := pd.getArrayLength()
92 if err != nil {
93 return err
94 }
95
96 r.Blocks = make(map[string]map[int32]*ProduceResponseBlock, numTopics)
97 for i := 0; i < numTopics; i++ {
98 name, err := pd.getString()
99 if err != nil {
100 return err
101 }
102
103 numBlocks, err := pd.getArrayLength()
104 if err != nil {
105 return err
106 }
107
108 r.Blocks[name] = make(map[int32]*ProduceResponseBlock, numBlocks)
109
110 for j := 0; j < numBlocks; j++ {
111 id, err := pd.getInt32()
112 if err != nil {
113 return err
114 }
115
116 block := new(ProduceResponseBlock)
117 err = block.decode(pd, version)
118 if err != nil {
119 return err
120 }
121 r.Blocks[name][id] = block
122 }
123 }
124
125 if r.Version >= 1 {
126 millis, err := pd.getInt32()
127 if err != nil {
128 return err
129 }
130
131 r.ThrottleTime = time.Duration(millis) * time.Millisecond
132 }
133
134 return nil
135}
136
137func (r *ProduceResponse) encode(pe packetEncoder) error {
138 err := pe.putArrayLength(len(r.Blocks))
139 if err != nil {
140 return err
141 }
142 for topic, partitions := range r.Blocks {
143 err = pe.putString(topic)
144 if err != nil {
145 return err
146 }
147 err = pe.putArrayLength(len(partitions))
148 if err != nil {
149 return err
150 }
151 for id, prb := range partitions {
152 pe.putInt32(id)
153 err = prb.encode(pe, r.Version)
154 if err != nil {
155 return err
156 }
157 }
158 }
khenaidood948f772021-08-11 17:49:24 -0400159
khenaidooac637102019-01-14 15:44:34 -0500160 if r.Version >= 1 {
161 pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
162 }
163 return nil
164}
165
166func (r *ProduceResponse) key() int16 {
167 return 0
168}
169
170func (r *ProduceResponse) version() int16 {
171 return r.Version
172}
173
khenaidood948f772021-08-11 17:49:24 -0400174func (r *ProduceResponse) headerVersion() int16 {
175 return 0
176}
177
khenaidooac637102019-01-14 15:44:34 -0500178func (r *ProduceResponse) requiredVersion() KafkaVersion {
khenaidood948f772021-08-11 17:49:24 -0400179 return MinVersion
khenaidooac637102019-01-14 15:44:34 -0500180}
181
182func (r *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock {
183 if r.Blocks == nil {
184 return nil
185 }
186
187 if r.Blocks[topic] == nil {
188 return nil
189 }
190
191 return r.Blocks[topic][partition]
192}
193
194// Testing API
195
196func (r *ProduceResponse) AddTopicPartition(topic string, partition int32, err KError) {
197 if r.Blocks == nil {
198 r.Blocks = make(map[string]map[int32]*ProduceResponseBlock)
199 }
200 byTopic, ok := r.Blocks[topic]
201 if !ok {
202 byTopic = make(map[int32]*ProduceResponseBlock)
203 r.Blocks[topic] = byTopic
204 }
205 block := &ProduceResponseBlock{
206 Err: err,
207 }
208 if r.Version >= 2 {
209 block.Timestamp = time.Now()
210 }
211 byTopic[partition] = block
212}