blob: 4c5cd3569c6a1f46339165fd1efd2d0f60ee49f4 [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001package sarama
2
3import (
4 "fmt"
5 "time"
6)
7
8type ProduceResponseBlock struct {
9 Err KError
10 Offset int64
11 // only provided if Version >= 2 and the broker is configured with `LogAppendTime`
12 Timestamp time.Time
13}
14
15func (b *ProduceResponseBlock) decode(pd packetDecoder, version int16) (err error) {
16 tmp, err := pd.getInt16()
17 if err != nil {
18 return err
19 }
20 b.Err = KError(tmp)
21
22 b.Offset, err = pd.getInt64()
23 if err != nil {
24 return err
25 }
26
27 if version >= 2 {
28 if millis, err := pd.getInt64(); err != nil {
29 return err
30 } else if millis != -1 {
31 b.Timestamp = time.Unix(millis/1000, (millis%1000)*int64(time.Millisecond))
32 }
33 }
34
35 return nil
36}
37
38func (b *ProduceResponseBlock) encode(pe packetEncoder, version int16) (err error) {
39 pe.putInt16(int16(b.Err))
40 pe.putInt64(b.Offset)
41
42 if version >= 2 {
43 timestamp := int64(-1)
44 if !b.Timestamp.Before(time.Unix(0, 0)) {
45 timestamp = b.Timestamp.UnixNano() / int64(time.Millisecond)
46 } else if !b.Timestamp.IsZero() {
47 return PacketEncodingError{fmt.Sprintf("invalid timestamp (%v)", b.Timestamp)}
48 }
49 pe.putInt64(timestamp)
50 }
51
52 return nil
53}
54
55type ProduceResponse struct {
56 Blocks map[string]map[int32]*ProduceResponseBlock
57 Version int16
58 ThrottleTime time.Duration // only provided if Version >= 1
59}
60
61func (r *ProduceResponse) decode(pd packetDecoder, version int16) (err error) {
62 r.Version = version
63
64 numTopics, err := pd.getArrayLength()
65 if err != nil {
66 return err
67 }
68
69 r.Blocks = make(map[string]map[int32]*ProduceResponseBlock, numTopics)
70 for i := 0; i < numTopics; i++ {
71 name, err := pd.getString()
72 if err != nil {
73 return err
74 }
75
76 numBlocks, err := pd.getArrayLength()
77 if err != nil {
78 return err
79 }
80
81 r.Blocks[name] = make(map[int32]*ProduceResponseBlock, numBlocks)
82
83 for j := 0; j < numBlocks; j++ {
84 id, err := pd.getInt32()
85 if err != nil {
86 return err
87 }
88
89 block := new(ProduceResponseBlock)
90 err = block.decode(pd, version)
91 if err != nil {
92 return err
93 }
94 r.Blocks[name][id] = block
95 }
96 }
97
98 if r.Version >= 1 {
99 millis, err := pd.getInt32()
100 if err != nil {
101 return err
102 }
103
104 r.ThrottleTime = time.Duration(millis) * time.Millisecond
105 }
106
107 return nil
108}
109
110func (r *ProduceResponse) encode(pe packetEncoder) error {
111 err := pe.putArrayLength(len(r.Blocks))
112 if err != nil {
113 return err
114 }
115 for topic, partitions := range r.Blocks {
116 err = pe.putString(topic)
117 if err != nil {
118 return err
119 }
120 err = pe.putArrayLength(len(partitions))
121 if err != nil {
122 return err
123 }
124 for id, prb := range partitions {
125 pe.putInt32(id)
126 err = prb.encode(pe, r.Version)
127 if err != nil {
128 return err
129 }
130 }
131 }
132 if r.Version >= 1 {
133 pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
134 }
135 return nil
136}
137
138func (r *ProduceResponse) key() int16 {
139 return 0
140}
141
142func (r *ProduceResponse) version() int16 {
143 return r.Version
144}
145
146func (r *ProduceResponse) requiredVersion() KafkaVersion {
147 switch r.Version {
148 case 1:
149 return V0_9_0_0
150 case 2:
151 return V0_10_0_0
152 case 3:
153 return V0_11_0_0
154 default:
155 return MinVersion
156 }
157}
158
159func (r *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock {
160 if r.Blocks == nil {
161 return nil
162 }
163
164 if r.Blocks[topic] == nil {
165 return nil
166 }
167
168 return r.Blocks[topic][partition]
169}
170
171// Testing API
172
173func (r *ProduceResponse) AddTopicPartition(topic string, partition int32, err KError) {
174 if r.Blocks == nil {
175 r.Blocks = make(map[string]map[int32]*ProduceResponseBlock)
176 }
177 byTopic, ok := r.Blocks[topic]
178 if !ok {
179 byTopic = make(map[int32]*ProduceResponseBlock)
180 r.Blocks[topic] = byTopic
181 }
182 block := &ProduceResponseBlock{
183 Err: err,
184 }
185 if r.Version >= 2 {
186 block.Timestamp = time.Now()
187 }
188 byTopic[partition] = block
189}