blob: c2654d12edb76749263c4ae846fdce012e324cdc [file] [log] [blame]
Scott Baker2c1c4822019-10-16 11:02:41 -07001package sarama
2
3import (
4 "bytes"
5 "encoding/binary"
6 "fmt"
7 "io"
8 "net"
9 "reflect"
10 "strconv"
11 "sync"
12 "time"
13
14 "github.com/davecgh/go-spew/spew"
15)
16
17const (
18 expectationTimeout = 500 * time.Millisecond
19)
20
21type GSSApiHandlerFunc func([]byte) []byte
22
khenaidoo26721882021-08-11 17:42:52 -040023type requestHandlerFunc func(req *request) (res encoderWithHeader)
Scott Baker2c1c4822019-10-16 11:02:41 -070024
25// RequestNotifierFunc is invoked when a mock broker processes a request successfully
26// and will provides the number of bytes read and written.
27type RequestNotifierFunc func(bytesRead, bytesWritten int)
28
29// MockBroker is a mock Kafka broker that is used in unit tests. It is exposed
30// to facilitate testing of higher level or specialized consumers and producers
31// built on top of Sarama. Note that it does not 'mimic' the Kafka API protocol,
32// but rather provides a facility to do that. It takes care of the TCP
khenaidoo26721882021-08-11 17:42:52 -040033// transport, request unmarshalling, response marshalling, and makes it the test
Scott Baker2c1c4822019-10-16 11:02:41 -070034// writer responsibility to program correct according to the Kafka API protocol
35// MockBroker behaviour.
36//
37// MockBroker is implemented as a TCP server listening on a kernel-selected
38// localhost port that can accept many connections. It reads Kafka requests
39// from that connection and returns responses programmed by the SetHandlerByMap
40// function. If a MockBroker receives a request that it has no programmed
41// response for, then it returns nothing and the request times out.
42//
43// A set of MockRequest builders to define mappings used by MockBroker is
44// provided by Sarama. But users can develop MockRequests of their own and use
45// them along with or instead of the standard ones.
46//
47// When running tests with MockBroker it is strongly recommended to specify
48// a timeout to `go test` so that if the broker hangs waiting for a response,
49// the test panics.
50//
51// It is not necessary to prefix message length or correlation ID to your
52// response bytes, the server does that automatically as a convenience.
53type MockBroker struct {
54 brokerID int32
55 port int32
56 closing chan none
57 stopper chan none
khenaidoo26721882021-08-11 17:42:52 -040058 expectations chan encoderWithHeader
Scott Baker2c1c4822019-10-16 11:02:41 -070059 listener net.Listener
60 t TestReporter
61 latency time.Duration
62 handler requestHandlerFunc
63 notifier RequestNotifierFunc
64 history []RequestResponse
65 lock sync.Mutex
66 gssApiHandler GSSApiHandlerFunc
67}
68
69// RequestResponse represents a Request/Response pair processed by MockBroker.
70type RequestResponse struct {
71 Request protocolBody
72 Response encoder
73}
74
75// SetLatency makes broker pause for the specified period every time before
76// replying.
77func (b *MockBroker) SetLatency(latency time.Duration) {
78 b.latency = latency
79}
80
81// SetHandlerByMap defines mapping of Request types to MockResponses. When a
82// request is received by the broker, it looks up the request type in the map
83// and uses the found MockResponse instance to generate an appropriate reply.
84// If the request type is not found in the map then nothing is sent.
85func (b *MockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) {
khenaidoo26721882021-08-11 17:42:52 -040086 b.setHandler(func(req *request) (res encoderWithHeader) {
Scott Baker2c1c4822019-10-16 11:02:41 -070087 reqTypeName := reflect.TypeOf(req.body).Elem().Name()
88 mockResponse := handlerMap[reqTypeName]
89 if mockResponse == nil {
90 return nil
91 }
92 return mockResponse.For(req.body)
93 })
94}
95
96// SetNotifier set a function that will get invoked whenever a request has been
97// processed successfully and will provide the number of bytes read and written
98func (b *MockBroker) SetNotifier(notifier RequestNotifierFunc) {
99 b.lock.Lock()
100 b.notifier = notifier
101 b.lock.Unlock()
102}
103
104// BrokerID returns broker ID assigned to the broker.
105func (b *MockBroker) BrokerID() int32 {
106 return b.brokerID
107}
108
109// History returns a slice of RequestResponse pairs in the order they were
110// processed by the broker. Note that in case of multiple connections to the
111// broker the order expected by a test can be different from the order recorded
112// in the history, unless some synchronization is implemented in the test.
113func (b *MockBroker) History() []RequestResponse {
114 b.lock.Lock()
115 history := make([]RequestResponse, len(b.history))
116 copy(history, b.history)
117 b.lock.Unlock()
118 return history
119}
120
121// Port returns the TCP port number the broker is listening for requests on.
122func (b *MockBroker) Port() int32 {
123 return b.port
124}
125
126// Addr returns the broker connection string in the form "<address>:<port>".
127func (b *MockBroker) Addr() string {
128 return b.listener.Addr().String()
129}
130
131// Close terminates the broker blocking until it stops internal goroutines and
132// releases all resources.
133func (b *MockBroker) Close() {
134 close(b.expectations)
135 if len(b.expectations) > 0 {
136 buf := bytes.NewBufferString(fmt.Sprintf("mockbroker/%d: not all expectations were satisfied! Still waiting on:\n", b.BrokerID()))
137 for e := range b.expectations {
138 _, _ = buf.WriteString(spew.Sdump(e))
139 }
140 b.t.Error(buf.String())
141 }
142 close(b.closing)
143 <-b.stopper
144}
145
146// setHandler sets the specified function as the request handler. Whenever
147// a mock broker reads a request from the wire it passes the request to the
148// function and sends back whatever the handler function returns.
149func (b *MockBroker) setHandler(handler requestHandlerFunc) {
150 b.lock.Lock()
151 b.handler = handler
152 b.lock.Unlock()
153}
154
155func (b *MockBroker) serverLoop() {
156 defer close(b.stopper)
157 var err error
158 var conn net.Conn
159
160 go func() {
161 <-b.closing
162 err := b.listener.Close()
163 if err != nil {
164 b.t.Error(err)
165 }
166 }()
167
168 wg := &sync.WaitGroup{}
169 i := 0
170 for conn, err = b.listener.Accept(); err == nil; conn, err = b.listener.Accept() {
171 wg.Add(1)
172 go b.handleRequests(conn, i, wg)
173 i++
174 }
175 wg.Wait()
176 Logger.Printf("*** mockbroker/%d: listener closed, err=%v", b.BrokerID(), err)
177}
178
179func (b *MockBroker) SetGSSAPIHandler(handler GSSApiHandlerFunc) {
180 b.gssApiHandler = handler
181}
182
183func (b *MockBroker) readToBytes(r io.Reader) ([]byte, error) {
184 var (
185 bytesRead int
186 lengthBytes = make([]byte, 4)
187 )
188
189 if _, err := io.ReadFull(r, lengthBytes); err != nil {
190 return nil, err
191 }
192
193 bytesRead += len(lengthBytes)
194 length := int32(binary.BigEndian.Uint32(lengthBytes))
195
196 if length <= 4 || length > MaxRequestSize {
197 return nil, PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", length)}
198 }
199
200 encodedReq := make([]byte, length)
201 if _, err := io.ReadFull(r, encodedReq); err != nil {
202 return nil, err
203 }
204
205 bytesRead += len(encodedReq)
206
207 fullBytes := append(lengthBytes, encodedReq...)
208
209 return fullBytes, nil
210}
211
212func (b *MockBroker) isGSSAPI(buffer []byte) bool {
213 return buffer[4] == 0x60 || bytes.Equal(buffer[4:6], []byte{0x05, 0x04})
214}
215
khenaidoo26721882021-08-11 17:42:52 -0400216func (b *MockBroker) handleRequests(conn io.ReadWriteCloser, idx int, wg *sync.WaitGroup) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700217 defer wg.Done()
218 defer func() {
219 _ = conn.Close()
220 }()
khenaidoo26721882021-08-11 17:42:52 -0400221 s := spew.NewDefaultConfig()
222 s.MaxDepth = 1
Scott Baker2c1c4822019-10-16 11:02:41 -0700223 Logger.Printf("*** mockbroker/%d/%d: connection opened", b.BrokerID(), idx)
224 var err error
225
226 abort := make(chan none)
227 defer close(abort)
228 go func() {
229 select {
230 case <-b.closing:
231 _ = conn.Close()
232 case <-abort:
233 }
234 }()
235
Scott Baker2c1c4822019-10-16 11:02:41 -0700236 var bytesWritten int
237 var bytesRead int
238 for {
Scott Baker2c1c4822019-10-16 11:02:41 -0700239 buffer, err := b.readToBytes(conn)
240 if err != nil {
241 Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(buffer))
242 b.serverError(err)
243 break
244 }
245
246 bytesWritten = 0
247 if !b.isGSSAPI(buffer) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700248 req, br, err := decodeRequest(bytes.NewReader(buffer))
249 bytesRead = br
250 if err != nil {
251 Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(req))
252 b.serverError(err)
253 break
254 }
255
256 if b.latency > 0 {
257 time.Sleep(b.latency)
258 }
259
260 b.lock.Lock()
261 res := b.handler(req)
262 b.history = append(b.history, RequestResponse{req.body, res})
263 b.lock.Unlock()
264
265 if res == nil {
266 Logger.Printf("*** mockbroker/%d/%d: ignored %v", b.brokerID, idx, spew.Sdump(req))
267 continue
268 }
khenaidoo26721882021-08-11 17:42:52 -0400269 Logger.Printf(
270 "*** mockbroker/%d/%d: replied to %T with %T\n-> %s\n-> %s",
271 b.brokerID, idx, req.body, res,
272 s.Sprintf("%#v", req.body),
273 s.Sprintf("%#v", res),
274 )
Scott Baker2c1c4822019-10-16 11:02:41 -0700275
276 encodedRes, err := encode(res, nil)
277 if err != nil {
278 b.serverError(err)
279 break
280 }
281 if len(encodedRes) == 0 {
282 b.lock.Lock()
283 if b.notifier != nil {
284 b.notifier(bytesRead, 0)
285 }
286 b.lock.Unlock()
287 continue
288 }
289
khenaidoo26721882021-08-11 17:42:52 -0400290 resHeader := b.encodeHeader(res.headerVersion(), req.correlationID, uint32(len(encodedRes)))
Scott Baker2c1c4822019-10-16 11:02:41 -0700291 if _, err = conn.Write(resHeader); err != nil {
292 b.serverError(err)
293 break
294 }
295 if _, err = conn.Write(encodedRes); err != nil {
296 b.serverError(err)
297 break
298 }
299 bytesWritten = len(resHeader) + len(encodedRes)
Scott Baker2c1c4822019-10-16 11:02:41 -0700300 } else {
301 // GSSAPI is not part of kafka protocol, but is supported for authentication proposes.
302 // Don't support history for this kind of request as is only used for test GSSAPI authentication mechanism
303 b.lock.Lock()
304 res := b.gssApiHandler(buffer)
305 b.lock.Unlock()
306 if res == nil {
307 Logger.Printf("*** mockbroker/%d/%d: ignored %v", b.brokerID, idx, spew.Sdump(buffer))
308 continue
309 }
310 if _, err = conn.Write(res); err != nil {
311 b.serverError(err)
312 break
313 }
314 bytesWritten = len(res)
315 }
316
317 b.lock.Lock()
318 if b.notifier != nil {
319 b.notifier(bytesRead, bytesWritten)
320 }
321 b.lock.Unlock()
Scott Baker2c1c4822019-10-16 11:02:41 -0700322 }
323 Logger.Printf("*** mockbroker/%d/%d: connection closed, err=%v", b.BrokerID(), idx, err)
324}
325
khenaidoo26721882021-08-11 17:42:52 -0400326func (b *MockBroker) encodeHeader(headerVersion int16, correlationId int32, payloadLength uint32) []byte {
327 headerLength := uint32(8)
328
329 if headerVersion >= 1 {
330 headerLength = 9
331 }
332
333 resHeader := make([]byte, headerLength)
334 binary.BigEndian.PutUint32(resHeader, payloadLength+headerLength-4)
335 binary.BigEndian.PutUint32(resHeader[4:], uint32(correlationId))
336
337 if headerVersion >= 1 {
338 binary.PutUvarint(resHeader[8:], 0)
339 }
340
341 return resHeader
342}
343
344func (b *MockBroker) defaultRequestHandler(req *request) (res encoderWithHeader) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700345 select {
346 case res, ok := <-b.expectations:
347 if !ok {
348 return nil
349 }
350 return res
351 case <-time.After(expectationTimeout):
352 return nil
353 }
354}
355
356func (b *MockBroker) serverError(err error) {
357 isConnectionClosedError := false
358 if _, ok := err.(*net.OpError); ok {
359 isConnectionClosedError = true
360 } else if err == io.EOF {
361 isConnectionClosedError = true
362 } else if err.Error() == "use of closed network connection" {
363 isConnectionClosedError = true
364 }
365
366 if isConnectionClosedError {
367 return
368 }
369
370 b.t.Errorf(err.Error())
371}
372
373// NewMockBroker launches a fake Kafka broker. It takes a TestReporter as provided by the
374// test framework and a channel of responses to use. If an error occurs it is
375// simply logged to the TestReporter and the broker exits.
376func NewMockBroker(t TestReporter, brokerID int32) *MockBroker {
377 return NewMockBrokerAddr(t, brokerID, "localhost:0")
378}
379
380// NewMockBrokerAddr behaves like newMockBroker but listens on the address you give
381// it rather than just some ephemeral port.
382func NewMockBrokerAddr(t TestReporter, brokerID int32, addr string) *MockBroker {
383 listener, err := net.Listen("tcp", addr)
384 if err != nil {
385 t.Fatal(err)
386 }
387 return NewMockBrokerListener(t, brokerID, listener)
388}
389
390// NewMockBrokerListener behaves like newMockBrokerAddr but accepts connections on the listener specified.
391func NewMockBrokerListener(t TestReporter, brokerID int32, listener net.Listener) *MockBroker {
392 var err error
393
394 broker := &MockBroker{
395 closing: make(chan none),
396 stopper: make(chan none),
397 t: t,
398 brokerID: brokerID,
khenaidoo26721882021-08-11 17:42:52 -0400399 expectations: make(chan encoderWithHeader, 512),
Scott Baker2c1c4822019-10-16 11:02:41 -0700400 listener: listener,
401 }
402 broker.handler = broker.defaultRequestHandler
403
404 Logger.Printf("*** mockbroker/%d listening on %s\n", brokerID, broker.listener.Addr().String())
405 _, portStr, err := net.SplitHostPort(broker.listener.Addr().String())
406 if err != nil {
407 t.Fatal(err)
408 }
409 tmp, err := strconv.ParseInt(portStr, 10, 32)
410 if err != nil {
411 t.Fatal(err)
412 }
413 broker.port = int32(tmp)
414
415 go broker.serverLoop()
416
417 return broker
418}
419
khenaidoo26721882021-08-11 17:42:52 -0400420func (b *MockBroker) Returns(e encoderWithHeader) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700421 b.expectations <- e
422}