blob: 7c815cd3a31cc4d3e0fc2fcecbf0aa37d4c2f37b [file] [log] [blame]
Scott Bakereee8dd82019-09-24 12:52:34 -07001package sarama
2
3import (
4 "bufio"
5 "fmt"
6 "net"
7 "regexp"
8)
9
10type none struct{}
11
12// make []int32 sortable so we can sort partition numbers
13type int32Slice []int32
14
15func (slice int32Slice) Len() int {
16 return len(slice)
17}
18
19func (slice int32Slice) Less(i, j int) bool {
20 return slice[i] < slice[j]
21}
22
23func (slice int32Slice) Swap(i, j int) {
24 slice[i], slice[j] = slice[j], slice[i]
25}
26
27func dupInt32Slice(input []int32) []int32 {
28 ret := make([]int32, 0, len(input))
29 for _, val := range input {
30 ret = append(ret, val)
31 }
32 return ret
33}
34
35func withRecover(fn func()) {
36 defer func() {
37 handler := PanicHandler
38 if handler != nil {
39 if err := recover(); err != nil {
40 handler(err)
41 }
42 }
43 }()
44
45 fn()
46}
47
48func safeAsyncClose(b *Broker) {
49 tmp := b // local var prevents clobbering in goroutine
50 go withRecover(func() {
51 if connected, _ := tmp.Connected(); connected {
52 if err := tmp.Close(); err != nil {
53 Logger.Println("Error closing broker", tmp.ID(), ":", err)
54 }
55 }
56 })
57}
58
59// Encoder is a simple interface for any type that can be encoded as an array of bytes
60// in order to be sent as the key or value of a Kafka message. Length() is provided as an
61// optimization, and must return the same as len() on the result of Encode().
62type Encoder interface {
63 Encode() ([]byte, error)
64 Length() int
65}
66
67// make strings and byte slices encodable for convenience so they can be used as keys
68// and/or values in kafka messages
69
70// StringEncoder implements the Encoder interface for Go strings so that they can be used
71// as the Key or Value in a ProducerMessage.
72type StringEncoder string
73
74func (s StringEncoder) Encode() ([]byte, error) {
75 return []byte(s), nil
76}
77
78func (s StringEncoder) Length() int {
79 return len(s)
80}
81
82// ByteEncoder implements the Encoder interface for Go byte slices so that they can be used
83// as the Key or Value in a ProducerMessage.
84type ByteEncoder []byte
85
86func (b ByteEncoder) Encode() ([]byte, error) {
87 return b, nil
88}
89
90func (b ByteEncoder) Length() int {
91 return len(b)
92}
93
94// bufConn wraps a net.Conn with a buffer for reads to reduce the number of
95// reads that trigger syscalls.
96type bufConn struct {
97 net.Conn
98 buf *bufio.Reader
99}
100
101func newBufConn(conn net.Conn) *bufConn {
102 return &bufConn{
103 Conn: conn,
104 buf: bufio.NewReader(conn),
105 }
106}
107
108func (bc *bufConn) Read(b []byte) (n int, err error) {
109 return bc.buf.Read(b)
110}
111
112// KafkaVersion instances represent versions of the upstream Kafka broker.
113type KafkaVersion struct {
114 // it's a struct rather than just typing the array directly to make it opaque and stop people
115 // generating their own arbitrary versions
116 version [4]uint
117}
118
119func newKafkaVersion(major, minor, veryMinor, patch uint) KafkaVersion {
120 return KafkaVersion{
121 version: [4]uint{major, minor, veryMinor, patch},
122 }
123}
124
125// IsAtLeast return true if and only if the version it is called on is
126// greater than or equal to the version passed in:
127// V1.IsAtLeast(V2) // false
128// V2.IsAtLeast(V1) // true
129func (v KafkaVersion) IsAtLeast(other KafkaVersion) bool {
130 for i := range v.version {
131 if v.version[i] > other.version[i] {
132 return true
133 } else if v.version[i] < other.version[i] {
134 return false
135 }
136 }
137 return true
138}
139
140// Effective constants defining the supported kafka versions.
141var (
142 V0_8_2_0 = newKafkaVersion(0, 8, 2, 0)
143 V0_8_2_1 = newKafkaVersion(0, 8, 2, 1)
144 V0_8_2_2 = newKafkaVersion(0, 8, 2, 2)
145 V0_9_0_0 = newKafkaVersion(0, 9, 0, 0)
146 V0_9_0_1 = newKafkaVersion(0, 9, 0, 1)
147 V0_10_0_0 = newKafkaVersion(0, 10, 0, 0)
148 V0_10_0_1 = newKafkaVersion(0, 10, 0, 1)
149 V0_10_1_0 = newKafkaVersion(0, 10, 1, 0)
150 V0_10_1_1 = newKafkaVersion(0, 10, 1, 1)
151 V0_10_2_0 = newKafkaVersion(0, 10, 2, 0)
152 V0_10_2_1 = newKafkaVersion(0, 10, 2, 1)
153 V0_11_0_0 = newKafkaVersion(0, 11, 0, 0)
154 V0_11_0_1 = newKafkaVersion(0, 11, 0, 1)
155 V0_11_0_2 = newKafkaVersion(0, 11, 0, 2)
156 V1_0_0_0 = newKafkaVersion(1, 0, 0, 0)
157 V1_1_0_0 = newKafkaVersion(1, 1, 0, 0)
158 V1_1_1_0 = newKafkaVersion(1, 1, 1, 0)
159 V2_0_0_0 = newKafkaVersion(2, 0, 0, 0)
160 V2_0_1_0 = newKafkaVersion(2, 0, 1, 0)
161 V2_1_0_0 = newKafkaVersion(2, 1, 0, 0)
Scott Baker611f6bd2019-10-18 13:45:19 -0700162 V2_2_0_0 = newKafkaVersion(2, 2, 0, 0)
163 V2_3_0_0 = newKafkaVersion(2, 3, 0, 0)
Scott Bakereee8dd82019-09-24 12:52:34 -0700164
165 SupportedVersions = []KafkaVersion{
166 V0_8_2_0,
167 V0_8_2_1,
168 V0_8_2_2,
169 V0_9_0_0,
170 V0_9_0_1,
171 V0_10_0_0,
172 V0_10_0_1,
173 V0_10_1_0,
174 V0_10_1_1,
175 V0_10_2_0,
176 V0_10_2_1,
177 V0_11_0_0,
178 V0_11_0_1,
179 V0_11_0_2,
180 V1_0_0_0,
181 V1_1_0_0,
182 V1_1_1_0,
183 V2_0_0_0,
184 V2_0_1_0,
185 V2_1_0_0,
Scott Baker611f6bd2019-10-18 13:45:19 -0700186 V2_2_0_0,
187 V2_3_0_0,
Scott Bakereee8dd82019-09-24 12:52:34 -0700188 }
189 MinVersion = V0_8_2_0
Scott Baker611f6bd2019-10-18 13:45:19 -0700190 MaxVersion = V2_3_0_0
Scott Bakereee8dd82019-09-24 12:52:34 -0700191)
192
Scott Baker611f6bd2019-10-18 13:45:19 -0700193//ParseKafkaVersion parses and returns kafka version or error from a string
Scott Bakereee8dd82019-09-24 12:52:34 -0700194func ParseKafkaVersion(s string) (KafkaVersion, error) {
195 if len(s) < 5 {
196 return MinVersion, fmt.Errorf("invalid version `%s`", s)
197 }
198 var major, minor, veryMinor, patch uint
199 var err error
200 if s[0] == '0' {
201 err = scanKafkaVersion(s, `^0\.\d+\.\d+\.\d+$`, "0.%d.%d.%d", [3]*uint{&minor, &veryMinor, &patch})
202 } else {
203 err = scanKafkaVersion(s, `^\d+\.\d+\.\d+$`, "%d.%d.%d", [3]*uint{&major, &minor, &veryMinor})
204 }
205 if err != nil {
206 return MinVersion, err
207 }
208 return newKafkaVersion(major, minor, veryMinor, patch), nil
209}
210
211func scanKafkaVersion(s string, pattern string, format string, v [3]*uint) error {
212 if !regexp.MustCompile(pattern).MatchString(s) {
213 return fmt.Errorf("invalid version `%s`", s)
214 }
215 _, err := fmt.Sscanf(s, format, v[0], v[1], v[2])
216 return err
217}
218
219func (v KafkaVersion) String() string {
220 if v.version[0] == 0 {
221 return fmt.Sprintf("0.%d.%d.%d", v.version[1], v.version[2], v.version[3])
222 }
223
224 return fmt.Sprintf("%d.%d.%d", v.version[0], v.version[1], v.version[2])
225}