blob: 4ed46a61aa4a514e9905175130c0f984aba547e6 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3import (
4 "bytes"
5 "encoding/binary"
6 "fmt"
7 "io"
8 "net"
9 "reflect"
10 "strconv"
11 "sync"
12 "time"
13
14 "github.com/davecgh/go-spew/spew"
15)
16
17const (
18 expectationTimeout = 500 * time.Millisecond
19)
20
Scott Baker8461e152019-10-01 14:44:30 -070021type GSSApiHandlerFunc func([]byte) []byte
22
khenaidooac637102019-01-14 15:44:34 -050023type requestHandlerFunc func(req *request) (res encoder)
24
25// RequestNotifierFunc is invoked when a mock broker processes a request successfully
26// and will provides the number of bytes read and written.
27type RequestNotifierFunc func(bytesRead, bytesWritten int)
28
29// MockBroker is a mock Kafka broker that is used in unit tests. It is exposed
30// to facilitate testing of higher level or specialized consumers and producers
31// built on top of Sarama. Note that it does not 'mimic' the Kafka API protocol,
32// but rather provides a facility to do that. It takes care of the TCP
33// transport, request unmarshaling, response marshaling, and makes it the test
34// writer responsibility to program correct according to the Kafka API protocol
35// MockBroker behaviour.
36//
37// MockBroker is implemented as a TCP server listening on a kernel-selected
38// localhost port that can accept many connections. It reads Kafka requests
39// from that connection and returns responses programmed by the SetHandlerByMap
40// function. If a MockBroker receives a request that it has no programmed
41// response for, then it returns nothing and the request times out.
42//
43// A set of MockRequest builders to define mappings used by MockBroker is
44// provided by Sarama. But users can develop MockRequests of their own and use
45// them along with or instead of the standard ones.
46//
47// When running tests with MockBroker it is strongly recommended to specify
48// a timeout to `go test` so that if the broker hangs waiting for a response,
49// the test panics.
50//
51// It is not necessary to prefix message length or correlation ID to your
52// response bytes, the server does that automatically as a convenience.
53type MockBroker struct {
Scott Baker8461e152019-10-01 14:44:30 -070054 brokerID int32
55 port int32
56 closing chan none
57 stopper chan none
58 expectations chan encoder
59 listener net.Listener
60 t TestReporter
61 latency time.Duration
62 handler requestHandlerFunc
63 notifier RequestNotifierFunc
64 history []RequestResponse
65 lock sync.Mutex
66 gssApiHandler GSSApiHandlerFunc
khenaidooac637102019-01-14 15:44:34 -050067}
68
69// RequestResponse represents a Request/Response pair processed by MockBroker.
70type RequestResponse struct {
71 Request protocolBody
72 Response encoder
73}
74
75// SetLatency makes broker pause for the specified period every time before
76// replying.
77func (b *MockBroker) SetLatency(latency time.Duration) {
78 b.latency = latency
79}
80
81// SetHandlerByMap defines mapping of Request types to MockResponses. When a
82// request is received by the broker, it looks up the request type in the map
83// and uses the found MockResponse instance to generate an appropriate reply.
84// If the request type is not found in the map then nothing is sent.
85func (b *MockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) {
86 b.setHandler(func(req *request) (res encoder) {
87 reqTypeName := reflect.TypeOf(req.body).Elem().Name()
88 mockResponse := handlerMap[reqTypeName]
89 if mockResponse == nil {
90 return nil
91 }
92 return mockResponse.For(req.body)
93 })
94}
95
96// SetNotifier set a function that will get invoked whenever a request has been
97// processed successfully and will provide the number of bytes read and written
98func (b *MockBroker) SetNotifier(notifier RequestNotifierFunc) {
99 b.lock.Lock()
100 b.notifier = notifier
101 b.lock.Unlock()
102}
103
104// BrokerID returns broker ID assigned to the broker.
105func (b *MockBroker) BrokerID() int32 {
106 return b.brokerID
107}
108
109// History returns a slice of RequestResponse pairs in the order they were
110// processed by the broker. Note that in case of multiple connections to the
111// broker the order expected by a test can be different from the order recorded
112// in the history, unless some synchronization is implemented in the test.
113func (b *MockBroker) History() []RequestResponse {
114 b.lock.Lock()
115 history := make([]RequestResponse, len(b.history))
116 copy(history, b.history)
117 b.lock.Unlock()
118 return history
119}
120
121// Port returns the TCP port number the broker is listening for requests on.
122func (b *MockBroker) Port() int32 {
123 return b.port
124}
125
126// Addr returns the broker connection string in the form "<address>:<port>".
127func (b *MockBroker) Addr() string {
128 return b.listener.Addr().String()
129}
130
131// Close terminates the broker blocking until it stops internal goroutines and
132// releases all resources.
133func (b *MockBroker) Close() {
134 close(b.expectations)
135 if len(b.expectations) > 0 {
136 buf := bytes.NewBufferString(fmt.Sprintf("mockbroker/%d: not all expectations were satisfied! Still waiting on:\n", b.BrokerID()))
137 for e := range b.expectations {
138 _, _ = buf.WriteString(spew.Sdump(e))
139 }
140 b.t.Error(buf.String())
141 }
142 close(b.closing)
143 <-b.stopper
144}
145
146// setHandler sets the specified function as the request handler. Whenever
147// a mock broker reads a request from the wire it passes the request to the
148// function and sends back whatever the handler function returns.
149func (b *MockBroker) setHandler(handler requestHandlerFunc) {
150 b.lock.Lock()
151 b.handler = handler
152 b.lock.Unlock()
153}
154
155func (b *MockBroker) serverLoop() {
156 defer close(b.stopper)
157 var err error
158 var conn net.Conn
159
160 go func() {
161 <-b.closing
162 err := b.listener.Close()
163 if err != nil {
164 b.t.Error(err)
165 }
166 }()
167
168 wg := &sync.WaitGroup{}
169 i := 0
170 for conn, err = b.listener.Accept(); err == nil; conn, err = b.listener.Accept() {
171 wg.Add(1)
172 go b.handleRequests(conn, i, wg)
173 i++
174 }
175 wg.Wait()
176 Logger.Printf("*** mockbroker/%d: listener closed, err=%v", b.BrokerID(), err)
177}
178
Scott Baker8461e152019-10-01 14:44:30 -0700179func (b *MockBroker) SetGSSAPIHandler(handler GSSApiHandlerFunc) {
180 b.gssApiHandler = handler
181}
182
183func (b *MockBroker) readToBytes(r io.Reader) ([]byte, error) {
184 var (
185 bytesRead int
186 lengthBytes = make([]byte, 4)
187 )
188
189 if _, err := io.ReadFull(r, lengthBytes); err != nil {
190 return nil, err
191 }
192
193 bytesRead += len(lengthBytes)
194 length := int32(binary.BigEndian.Uint32(lengthBytes))
195
196 if length <= 4 || length > MaxRequestSize {
197 return nil, PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", length)}
198 }
199
200 encodedReq := make([]byte, length)
201 if _, err := io.ReadFull(r, encodedReq); err != nil {
202 return nil, err
203 }
204
205 bytesRead += len(encodedReq)
206
207 fullBytes := append(lengthBytes, encodedReq...)
208
209 return fullBytes, nil
210}
211
212func (b *MockBroker) isGSSAPI(buffer []byte) bool {
213 return buffer[4] == 0x60 || bytes.Equal(buffer[4:6], []byte{0x05, 0x04})
214}
215
khenaidooac637102019-01-14 15:44:34 -0500216func (b *MockBroker) handleRequests(conn net.Conn, idx int, wg *sync.WaitGroup) {
217 defer wg.Done()
218 defer func() {
219 _ = conn.Close()
220 }()
221 Logger.Printf("*** mockbroker/%d/%d: connection opened", b.BrokerID(), idx)
222 var err error
223
224 abort := make(chan none)
225 defer close(abort)
226 go func() {
227 select {
228 case <-b.closing:
229 _ = conn.Close()
230 case <-abort:
231 }
232 }()
233
234 resHeader := make([]byte, 8)
Scott Baker8461e152019-10-01 14:44:30 -0700235 var bytesWritten int
236 var bytesRead int
khenaidooac637102019-01-14 15:44:34 -0500237 for {
Scott Baker8461e152019-10-01 14:44:30 -0700238
239 buffer, err := b.readToBytes(conn)
khenaidooac637102019-01-14 15:44:34 -0500240 if err != nil {
Scott Baker8461e152019-10-01 14:44:30 -0700241 Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(buffer))
khenaidooac637102019-01-14 15:44:34 -0500242 b.serverError(err)
243 break
244 }
245
Scott Baker8461e152019-10-01 14:44:30 -0700246 bytesWritten = 0
247 if !b.isGSSAPI(buffer) {
khenaidooac637102019-01-14 15:44:34 -0500248
Scott Baker8461e152019-10-01 14:44:30 -0700249 req, br, err := decodeRequest(bytes.NewReader(buffer))
250 bytesRead = br
251 if err != nil {
252 Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(req))
253 b.serverError(err)
254 break
khenaidooac637102019-01-14 15:44:34 -0500255 }
khenaidooac637102019-01-14 15:44:34 -0500256
Scott Baker8461e152019-10-01 14:44:30 -0700257 if b.latency > 0 {
258 time.Sleep(b.latency)
259 }
260
261 b.lock.Lock()
262 res := b.handler(req)
263 b.history = append(b.history, RequestResponse{req.body, res})
264 b.lock.Unlock()
265
266 if res == nil {
267 Logger.Printf("*** mockbroker/%d/%d: ignored %v", b.brokerID, idx, spew.Sdump(req))
268 continue
269 }
270 Logger.Printf("*** mockbroker/%d/%d: served %v -> %v", b.brokerID, idx, req, res)
271
272 encodedRes, err := encode(res, nil)
273 if err != nil {
274 b.serverError(err)
275 break
276 }
277 if len(encodedRes) == 0 {
278 b.lock.Lock()
279 if b.notifier != nil {
280 b.notifier(bytesRead, 0)
281 }
282 b.lock.Unlock()
283 continue
284 }
285
286 binary.BigEndian.PutUint32(resHeader, uint32(len(encodedRes)+4))
287 binary.BigEndian.PutUint32(resHeader[4:], uint32(req.correlationID))
288 if _, err = conn.Write(resHeader); err != nil {
289 b.serverError(err)
290 break
291 }
292 if _, err = conn.Write(encodedRes); err != nil {
293 b.serverError(err)
294 break
295 }
296 bytesWritten = len(resHeader) + len(encodedRes)
297
298 } else {
299 // GSSAPI is not part of kafka protocol, but is supported for authentication proposes.
300 // Don't support history for this kind of request as is only used for test GSSAPI authentication mechanism
301 b.lock.Lock()
302 res := b.gssApiHandler(buffer)
303 b.lock.Unlock()
304 if res == nil {
305 Logger.Printf("*** mockbroker/%d/%d: ignored %v", b.brokerID, idx, spew.Sdump(buffer))
306 continue
307 }
308 if _, err = conn.Write(res); err != nil {
309 b.serverError(err)
310 break
311 }
312 bytesWritten = len(res)
khenaidooac637102019-01-14 15:44:34 -0500313 }
314
315 b.lock.Lock()
316 if b.notifier != nil {
Scott Baker8461e152019-10-01 14:44:30 -0700317 b.notifier(bytesRead, bytesWritten)
khenaidooac637102019-01-14 15:44:34 -0500318 }
319 b.lock.Unlock()
Scott Baker8461e152019-10-01 14:44:30 -0700320
khenaidooac637102019-01-14 15:44:34 -0500321 }
322 Logger.Printf("*** mockbroker/%d/%d: connection closed, err=%v", b.BrokerID(), idx, err)
323}
324
325func (b *MockBroker) defaultRequestHandler(req *request) (res encoder) {
326 select {
327 case res, ok := <-b.expectations:
328 if !ok {
329 return nil
330 }
331 return res
332 case <-time.After(expectationTimeout):
333 return nil
334 }
335}
336
337func (b *MockBroker) serverError(err error) {
338 isConnectionClosedError := false
339 if _, ok := err.(*net.OpError); ok {
340 isConnectionClosedError = true
341 } else if err == io.EOF {
342 isConnectionClosedError = true
343 } else if err.Error() == "use of closed network connection" {
344 isConnectionClosedError = true
345 }
346
347 if isConnectionClosedError {
348 return
349 }
350
351 b.t.Errorf(err.Error())
352}
353
354// NewMockBroker launches a fake Kafka broker. It takes a TestReporter as provided by the
355// test framework and a channel of responses to use. If an error occurs it is
356// simply logged to the TestReporter and the broker exits.
357func NewMockBroker(t TestReporter, brokerID int32) *MockBroker {
358 return NewMockBrokerAddr(t, brokerID, "localhost:0")
359}
360
361// NewMockBrokerAddr behaves like newMockBroker but listens on the address you give
362// it rather than just some ephemeral port.
363func NewMockBrokerAddr(t TestReporter, brokerID int32, addr string) *MockBroker {
364 listener, err := net.Listen("tcp", addr)
365 if err != nil {
366 t.Fatal(err)
367 }
368 return NewMockBrokerListener(t, brokerID, listener)
369}
370
371// NewMockBrokerListener behaves like newMockBrokerAddr but accepts connections on the listener specified.
372func NewMockBrokerListener(t TestReporter, brokerID int32, listener net.Listener) *MockBroker {
373 var err error
374
375 broker := &MockBroker{
376 closing: make(chan none),
377 stopper: make(chan none),
378 t: t,
379 brokerID: brokerID,
380 expectations: make(chan encoder, 512),
381 listener: listener,
382 }
383 broker.handler = broker.defaultRequestHandler
384
385 Logger.Printf("*** mockbroker/%d listening on %s\n", brokerID, broker.listener.Addr().String())
386 _, portStr, err := net.SplitHostPort(broker.listener.Addr().String())
387 if err != nil {
388 t.Fatal(err)
389 }
390 tmp, err := strconv.ParseInt(portStr, 10, 32)
391 if err != nil {
392 t.Fatal(err)
393 }
394 broker.port = int32(tmp)
395
396 go broker.serverLoop()
397
398 return broker
399}
400
401func (b *MockBroker) Returns(e encoder) {
402 b.expectations <- e
403}