blob: 1859d29c2138427060eb289663d133b5c3f0e13c [file] [log] [blame]
Scott Bakered4efab2020-01-13 19:12:25 -08001package sarama
2
3import (
4 "bufio"
5 "fmt"
6 "net"
7 "regexp"
8)
9
10type none struct{}
11
12// make []int32 sortable so we can sort partition numbers
13type int32Slice []int32
14
15func (slice int32Slice) Len() int {
16 return len(slice)
17}
18
19func (slice int32Slice) Less(i, j int) bool {
20 return slice[i] < slice[j]
21}
22
23func (slice int32Slice) Swap(i, j int) {
24 slice[i], slice[j] = slice[j], slice[i]
25}
26
27func dupInt32Slice(input []int32) []int32 {
28 ret := make([]int32, 0, len(input))
David K. Bainbridgebd6b2882021-08-26 13:31:02 +000029 ret = append(ret, input...)
Scott Bakered4efab2020-01-13 19:12:25 -080030 return ret
31}
32
33func withRecover(fn func()) {
34 defer func() {
35 handler := PanicHandler
36 if handler != nil {
37 if err := recover(); err != nil {
38 handler(err)
39 }
40 }
41 }()
42
43 fn()
44}
45
46func safeAsyncClose(b *Broker) {
47 tmp := b // local var prevents clobbering in goroutine
48 go withRecover(func() {
49 if connected, _ := tmp.Connected(); connected {
50 if err := tmp.Close(); err != nil {
51 Logger.Println("Error closing broker", tmp.ID(), ":", err)
52 }
53 }
54 })
55}
56
57// Encoder is a simple interface for any type that can be encoded as an array of bytes
58// in order to be sent as the key or value of a Kafka message. Length() is provided as an
59// optimization, and must return the same as len() on the result of Encode().
60type Encoder interface {
61 Encode() ([]byte, error)
62 Length() int
63}
64
65// make strings and byte slices encodable for convenience so they can be used as keys
66// and/or values in kafka messages
67
68// StringEncoder implements the Encoder interface for Go strings so that they can be used
69// as the Key or Value in a ProducerMessage.
70type StringEncoder string
71
72func (s StringEncoder) Encode() ([]byte, error) {
73 return []byte(s), nil
74}
75
76func (s StringEncoder) Length() int {
77 return len(s)
78}
79
80// ByteEncoder implements the Encoder interface for Go byte slices so that they can be used
81// as the Key or Value in a ProducerMessage.
82type ByteEncoder []byte
83
84func (b ByteEncoder) Encode() ([]byte, error) {
85 return b, nil
86}
87
88func (b ByteEncoder) Length() int {
89 return len(b)
90}
91
92// bufConn wraps a net.Conn with a buffer for reads to reduce the number of
93// reads that trigger syscalls.
94type bufConn struct {
95 net.Conn
96 buf *bufio.Reader
97}
98
99func newBufConn(conn net.Conn) *bufConn {
100 return &bufConn{
101 Conn: conn,
102 buf: bufio.NewReader(conn),
103 }
104}
105
106func (bc *bufConn) Read(b []byte) (n int, err error) {
107 return bc.buf.Read(b)
108}
109
110// KafkaVersion instances represent versions of the upstream Kafka broker.
111type KafkaVersion struct {
112 // it's a struct rather than just typing the array directly to make it opaque and stop people
113 // generating their own arbitrary versions
114 version [4]uint
115}
116
117func newKafkaVersion(major, minor, veryMinor, patch uint) KafkaVersion {
118 return KafkaVersion{
119 version: [4]uint{major, minor, veryMinor, patch},
120 }
121}
122
123// IsAtLeast return true if and only if the version it is called on is
124// greater than or equal to the version passed in:
125// V1.IsAtLeast(V2) // false
126// V2.IsAtLeast(V1) // true
127func (v KafkaVersion) IsAtLeast(other KafkaVersion) bool {
128 for i := range v.version {
129 if v.version[i] > other.version[i] {
130 return true
131 } else if v.version[i] < other.version[i] {
132 return false
133 }
134 }
135 return true
136}
137
138// Effective constants defining the supported kafka versions.
139var (
140 V0_8_2_0 = newKafkaVersion(0, 8, 2, 0)
141 V0_8_2_1 = newKafkaVersion(0, 8, 2, 1)
142 V0_8_2_2 = newKafkaVersion(0, 8, 2, 2)
143 V0_9_0_0 = newKafkaVersion(0, 9, 0, 0)
144 V0_9_0_1 = newKafkaVersion(0, 9, 0, 1)
145 V0_10_0_0 = newKafkaVersion(0, 10, 0, 0)
146 V0_10_0_1 = newKafkaVersion(0, 10, 0, 1)
147 V0_10_1_0 = newKafkaVersion(0, 10, 1, 0)
148 V0_10_1_1 = newKafkaVersion(0, 10, 1, 1)
149 V0_10_2_0 = newKafkaVersion(0, 10, 2, 0)
150 V0_10_2_1 = newKafkaVersion(0, 10, 2, 1)
151 V0_11_0_0 = newKafkaVersion(0, 11, 0, 0)
152 V0_11_0_1 = newKafkaVersion(0, 11, 0, 1)
153 V0_11_0_2 = newKafkaVersion(0, 11, 0, 2)
154 V1_0_0_0 = newKafkaVersion(1, 0, 0, 0)
155 V1_1_0_0 = newKafkaVersion(1, 1, 0, 0)
156 V1_1_1_0 = newKafkaVersion(1, 1, 1, 0)
157 V2_0_0_0 = newKafkaVersion(2, 0, 0, 0)
158 V2_0_1_0 = newKafkaVersion(2, 0, 1, 0)
159 V2_1_0_0 = newKafkaVersion(2, 1, 0, 0)
160 V2_2_0_0 = newKafkaVersion(2, 2, 0, 0)
161 V2_3_0_0 = newKafkaVersion(2, 3, 0, 0)
162 V2_4_0_0 = newKafkaVersion(2, 4, 0, 0)
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000163 V2_5_0_0 = newKafkaVersion(2, 5, 0, 0)
164 V2_6_0_0 = newKafkaVersion(2, 6, 0, 0)
165 V2_7_0_0 = newKafkaVersion(2, 7, 0, 0)
166 V2_8_0_0 = newKafkaVersion(2, 8, 0, 0)
Scott Bakered4efab2020-01-13 19:12:25 -0800167
168 SupportedVersions = []KafkaVersion{
169 V0_8_2_0,
170 V0_8_2_1,
171 V0_8_2_2,
172 V0_9_0_0,
173 V0_9_0_1,
174 V0_10_0_0,
175 V0_10_0_1,
176 V0_10_1_0,
177 V0_10_1_1,
178 V0_10_2_0,
179 V0_10_2_1,
180 V0_11_0_0,
181 V0_11_0_1,
182 V0_11_0_2,
183 V1_0_0_0,
184 V1_1_0_0,
185 V1_1_1_0,
186 V2_0_0_0,
187 V2_0_1_0,
188 V2_1_0_0,
189 V2_2_0_0,
190 V2_3_0_0,
191 V2_4_0_0,
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000192 V2_5_0_0,
193 V2_6_0_0,
194 V2_7_0_0,
195 V2_8_0_0,
Scott Bakered4efab2020-01-13 19:12:25 -0800196 }
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000197 MinVersion = V0_8_2_0
198 MaxVersion = V2_8_0_0
199 DefaultVersion = V1_0_0_0
Scott Bakered4efab2020-01-13 19:12:25 -0800200)
201
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000202// ParseKafkaVersion parses and returns kafka version or error from a string
Scott Bakered4efab2020-01-13 19:12:25 -0800203func ParseKafkaVersion(s string) (KafkaVersion, error) {
204 if len(s) < 5 {
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000205 return DefaultVersion, fmt.Errorf("invalid version `%s`", s)
Scott Bakered4efab2020-01-13 19:12:25 -0800206 }
207 var major, minor, veryMinor, patch uint
208 var err error
209 if s[0] == '0' {
210 err = scanKafkaVersion(s, `^0\.\d+\.\d+\.\d+$`, "0.%d.%d.%d", [3]*uint{&minor, &veryMinor, &patch})
211 } else {
212 err = scanKafkaVersion(s, `^\d+\.\d+\.\d+$`, "%d.%d.%d", [3]*uint{&major, &minor, &veryMinor})
213 }
214 if err != nil {
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000215 return DefaultVersion, err
Scott Bakered4efab2020-01-13 19:12:25 -0800216 }
217 return newKafkaVersion(major, minor, veryMinor, patch), nil
218}
219
220func scanKafkaVersion(s string, pattern string, format string, v [3]*uint) error {
221 if !regexp.MustCompile(pattern).MatchString(s) {
222 return fmt.Errorf("invalid version `%s`", s)
223 }
224 _, err := fmt.Sscanf(s, format, v[0], v[1], v[2])
225 return err
226}
227
228func (v KafkaVersion) String() string {
229 if v.version[0] == 0 {
230 return fmt.Sprintf("0.%d.%d.%d", v.version[1], v.version[2], v.version[3])
231 }
232
233 return fmt.Sprintf("%d.%d.%d", v.version[0], v.version[1], v.version[2])
234}