blob: 9166f6efbca9927965ba4fc2f59241e5ab85aae1 [file] [log] [blame]
kesavand2cde6582020-06-22 04:56:23 -04001package sarama
2
3import (
4 "bytes"
5 "encoding/binary"
6 "fmt"
7 "io"
8 "net"
9 "reflect"
10 "strconv"
11 "sync"
12 "time"
13
14 "github.com/davecgh/go-spew/spew"
15)
16
17const (
18 expectationTimeout = 500 * time.Millisecond
19)
20
21type GSSApiHandlerFunc func([]byte) []byte
22
kesavandc71914f2022-03-25 11:19:03 +053023type requestHandlerFunc func(req *request) (res encoderWithHeader)
kesavand2cde6582020-06-22 04:56:23 -040024
25// RequestNotifierFunc is invoked when a mock broker processes a request successfully
26// and will provides the number of bytes read and written.
27type RequestNotifierFunc func(bytesRead, bytesWritten int)
28
29// MockBroker is a mock Kafka broker that is used in unit tests. It is exposed
30// to facilitate testing of higher level or specialized consumers and producers
31// built on top of Sarama. Note that it does not 'mimic' the Kafka API protocol,
32// but rather provides a facility to do that. It takes care of the TCP
kesavandc71914f2022-03-25 11:19:03 +053033// transport, request unmarshalling, response marshaling, and makes it the test
kesavand2cde6582020-06-22 04:56:23 -040034// writer responsibility to program correct according to the Kafka API protocol
kesavandc71914f2022-03-25 11:19:03 +053035// MockBroker behavior.
kesavand2cde6582020-06-22 04:56:23 -040036//
37// MockBroker is implemented as a TCP server listening on a kernel-selected
38// localhost port that can accept many connections. It reads Kafka requests
39// from that connection and returns responses programmed by the SetHandlerByMap
40// function. If a MockBroker receives a request that it has no programmed
41// response for, then it returns nothing and the request times out.
42//
43// A set of MockRequest builders to define mappings used by MockBroker is
44// provided by Sarama. But users can develop MockRequests of their own and use
45// them along with or instead of the standard ones.
46//
47// When running tests with MockBroker it is strongly recommended to specify
48// a timeout to `go test` so that if the broker hangs waiting for a response,
49// the test panics.
50//
51// It is not necessary to prefix message length or correlation ID to your
52// response bytes, the server does that automatically as a convenience.
53type MockBroker struct {
54 brokerID int32
55 port int32
56 closing chan none
57 stopper chan none
kesavandc71914f2022-03-25 11:19:03 +053058 expectations chan encoderWithHeader
kesavand2cde6582020-06-22 04:56:23 -040059 listener net.Listener
60 t TestReporter
61 latency time.Duration
62 handler requestHandlerFunc
63 notifier RequestNotifierFunc
64 history []RequestResponse
65 lock sync.Mutex
66 gssApiHandler GSSApiHandlerFunc
67}
68
69// RequestResponse represents a Request/Response pair processed by MockBroker.
70type RequestResponse struct {
71 Request protocolBody
72 Response encoder
73}
74
75// SetLatency makes broker pause for the specified period every time before
76// replying.
77func (b *MockBroker) SetLatency(latency time.Duration) {
78 b.latency = latency
79}
80
81// SetHandlerByMap defines mapping of Request types to MockResponses. When a
82// request is received by the broker, it looks up the request type in the map
83// and uses the found MockResponse instance to generate an appropriate reply.
84// If the request type is not found in the map then nothing is sent.
85func (b *MockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) {
kesavandc71914f2022-03-25 11:19:03 +053086 fnMap := make(map[string]MockResponse)
87 for k, v := range handlerMap {
88 fnMap[k] = v
89 }
90 b.setHandler(func(req *request) (res encoderWithHeader) {
kesavand2cde6582020-06-22 04:56:23 -040091 reqTypeName := reflect.TypeOf(req.body).Elem().Name()
kesavandc71914f2022-03-25 11:19:03 +053092 mockResponse := fnMap[reqTypeName]
kesavand2cde6582020-06-22 04:56:23 -040093 if mockResponse == nil {
94 return nil
95 }
96 return mockResponse.For(req.body)
97 })
98}
99
100// SetNotifier set a function that will get invoked whenever a request has been
101// processed successfully and will provide the number of bytes read and written
102func (b *MockBroker) SetNotifier(notifier RequestNotifierFunc) {
103 b.lock.Lock()
104 b.notifier = notifier
105 b.lock.Unlock()
106}
107
108// BrokerID returns broker ID assigned to the broker.
109func (b *MockBroker) BrokerID() int32 {
110 return b.brokerID
111}
112
113// History returns a slice of RequestResponse pairs in the order they were
114// processed by the broker. Note that in case of multiple connections to the
115// broker the order expected by a test can be different from the order recorded
116// in the history, unless some synchronization is implemented in the test.
117func (b *MockBroker) History() []RequestResponse {
118 b.lock.Lock()
119 history := make([]RequestResponse, len(b.history))
120 copy(history, b.history)
121 b.lock.Unlock()
122 return history
123}
124
125// Port returns the TCP port number the broker is listening for requests on.
126func (b *MockBroker) Port() int32 {
127 return b.port
128}
129
130// Addr returns the broker connection string in the form "<address>:<port>".
131func (b *MockBroker) Addr() string {
132 return b.listener.Addr().String()
133}
134
135// Close terminates the broker blocking until it stops internal goroutines and
136// releases all resources.
137func (b *MockBroker) Close() {
138 close(b.expectations)
139 if len(b.expectations) > 0 {
140 buf := bytes.NewBufferString(fmt.Sprintf("mockbroker/%d: not all expectations were satisfied! Still waiting on:\n", b.BrokerID()))
141 for e := range b.expectations {
142 _, _ = buf.WriteString(spew.Sdump(e))
143 }
144 b.t.Error(buf.String())
145 }
146 close(b.closing)
147 <-b.stopper
148}
149
150// setHandler sets the specified function as the request handler. Whenever
151// a mock broker reads a request from the wire it passes the request to the
152// function and sends back whatever the handler function returns.
153func (b *MockBroker) setHandler(handler requestHandlerFunc) {
154 b.lock.Lock()
155 b.handler = handler
156 b.lock.Unlock()
157}
158
159func (b *MockBroker) serverLoop() {
160 defer close(b.stopper)
161 var err error
162 var conn net.Conn
163
164 go func() {
165 <-b.closing
166 err := b.listener.Close()
167 if err != nil {
168 b.t.Error(err)
169 }
170 }()
171
172 wg := &sync.WaitGroup{}
173 i := 0
174 for conn, err = b.listener.Accept(); err == nil; conn, err = b.listener.Accept() {
175 wg.Add(1)
176 go b.handleRequests(conn, i, wg)
177 i++
178 }
179 wg.Wait()
180 Logger.Printf("*** mockbroker/%d: listener closed, err=%v", b.BrokerID(), err)
181}
182
183func (b *MockBroker) SetGSSAPIHandler(handler GSSApiHandlerFunc) {
184 b.gssApiHandler = handler
185}
186
187func (b *MockBroker) readToBytes(r io.Reader) ([]byte, error) {
188 var (
189 bytesRead int
190 lengthBytes = make([]byte, 4)
191 )
192
193 if _, err := io.ReadFull(r, lengthBytes); err != nil {
194 return nil, err
195 }
196
197 bytesRead += len(lengthBytes)
198 length := int32(binary.BigEndian.Uint32(lengthBytes))
199
200 if length <= 4 || length > MaxRequestSize {
201 return nil, PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", length)}
202 }
203
204 encodedReq := make([]byte, length)
205 if _, err := io.ReadFull(r, encodedReq); err != nil {
206 return nil, err
207 }
208
209 bytesRead += len(encodedReq)
210
211 fullBytes := append(lengthBytes, encodedReq...)
212
213 return fullBytes, nil
214}
215
216func (b *MockBroker) isGSSAPI(buffer []byte) bool {
217 return buffer[4] == 0x60 || bytes.Equal(buffer[4:6], []byte{0x05, 0x04})
218}
219
kesavandc71914f2022-03-25 11:19:03 +0530220func (b *MockBroker) handleRequests(conn io.ReadWriteCloser, idx int, wg *sync.WaitGroup) {
kesavand2cde6582020-06-22 04:56:23 -0400221 defer wg.Done()
222 defer func() {
223 _ = conn.Close()
224 }()
kesavandc71914f2022-03-25 11:19:03 +0530225 s := spew.NewDefaultConfig()
226 s.MaxDepth = 1
kesavand2cde6582020-06-22 04:56:23 -0400227 Logger.Printf("*** mockbroker/%d/%d: connection opened", b.BrokerID(), idx)
228 var err error
229
230 abort := make(chan none)
231 defer close(abort)
232 go func() {
233 select {
234 case <-b.closing:
235 _ = conn.Close()
236 case <-abort:
237 }
238 }()
239
kesavand2cde6582020-06-22 04:56:23 -0400240 var bytesWritten int
241 var bytesRead int
242 for {
kesavand2cde6582020-06-22 04:56:23 -0400243 buffer, err := b.readToBytes(conn)
244 if err != nil {
245 Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(buffer))
246 b.serverError(err)
247 break
248 }
249
250 bytesWritten = 0
251 if !b.isGSSAPI(buffer) {
kesavand2cde6582020-06-22 04:56:23 -0400252 req, br, err := decodeRequest(bytes.NewReader(buffer))
253 bytesRead = br
254 if err != nil {
255 Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(req))
256 b.serverError(err)
257 break
258 }
259
260 if b.latency > 0 {
261 time.Sleep(b.latency)
262 }
263
264 b.lock.Lock()
265 res := b.handler(req)
266 b.history = append(b.history, RequestResponse{req.body, res})
267 b.lock.Unlock()
268
269 if res == nil {
270 Logger.Printf("*** mockbroker/%d/%d: ignored %v", b.brokerID, idx, spew.Sdump(req))
271 continue
272 }
kesavandc71914f2022-03-25 11:19:03 +0530273 Logger.Printf(
274 "*** mockbroker/%d/%d: replied to %T with %T\n-> %s\n-> %s",
275 b.brokerID, idx, req.body, res,
276 s.Sprintf("%#v", req.body),
277 s.Sprintf("%#v", res),
278 )
kesavand2cde6582020-06-22 04:56:23 -0400279
280 encodedRes, err := encode(res, nil)
281 if err != nil {
282 b.serverError(err)
283 break
284 }
285 if len(encodedRes) == 0 {
286 b.lock.Lock()
287 if b.notifier != nil {
288 b.notifier(bytesRead, 0)
289 }
290 b.lock.Unlock()
291 continue
292 }
293
kesavandc71914f2022-03-25 11:19:03 +0530294 resHeader := b.encodeHeader(res.headerVersion(), req.correlationID, uint32(len(encodedRes)))
kesavand2cde6582020-06-22 04:56:23 -0400295 if _, err = conn.Write(resHeader); err != nil {
296 b.serverError(err)
297 break
298 }
299 if _, err = conn.Write(encodedRes); err != nil {
300 b.serverError(err)
301 break
302 }
303 bytesWritten = len(resHeader) + len(encodedRes)
kesavand2cde6582020-06-22 04:56:23 -0400304 } else {
305 // GSSAPI is not part of kafka protocol, but is supported for authentication proposes.
306 // Don't support history for this kind of request as is only used for test GSSAPI authentication mechanism
307 b.lock.Lock()
308 res := b.gssApiHandler(buffer)
309 b.lock.Unlock()
310 if res == nil {
311 Logger.Printf("*** mockbroker/%d/%d: ignored %v", b.brokerID, idx, spew.Sdump(buffer))
312 continue
313 }
314 if _, err = conn.Write(res); err != nil {
315 b.serverError(err)
316 break
317 }
318 bytesWritten = len(res)
319 }
320
321 b.lock.Lock()
322 if b.notifier != nil {
323 b.notifier(bytesRead, bytesWritten)
324 }
325 b.lock.Unlock()
kesavand2cde6582020-06-22 04:56:23 -0400326 }
327 Logger.Printf("*** mockbroker/%d/%d: connection closed, err=%v", b.BrokerID(), idx, err)
328}
329
kesavandc71914f2022-03-25 11:19:03 +0530330func (b *MockBroker) encodeHeader(headerVersion int16, correlationId int32, payloadLength uint32) []byte {
331 headerLength := uint32(8)
332
333 if headerVersion >= 1 {
334 headerLength = 9
335 }
336
337 resHeader := make([]byte, headerLength)
338 binary.BigEndian.PutUint32(resHeader, payloadLength+headerLength-4)
339 binary.BigEndian.PutUint32(resHeader[4:], uint32(correlationId))
340
341 if headerVersion >= 1 {
342 binary.PutUvarint(resHeader[8:], 0)
343 }
344
345 return resHeader
346}
347
348func (b *MockBroker) defaultRequestHandler(req *request) (res encoderWithHeader) {
kesavand2cde6582020-06-22 04:56:23 -0400349 select {
350 case res, ok := <-b.expectations:
351 if !ok {
352 return nil
353 }
354 return res
355 case <-time.After(expectationTimeout):
356 return nil
357 }
358}
359
360func (b *MockBroker) serverError(err error) {
361 isConnectionClosedError := false
362 if _, ok := err.(*net.OpError); ok {
363 isConnectionClosedError = true
364 } else if err == io.EOF {
365 isConnectionClosedError = true
366 } else if err.Error() == "use of closed network connection" {
367 isConnectionClosedError = true
368 }
369
370 if isConnectionClosedError {
371 return
372 }
373
374 b.t.Errorf(err.Error())
375}
376
377// NewMockBroker launches a fake Kafka broker. It takes a TestReporter as provided by the
378// test framework and a channel of responses to use. If an error occurs it is
379// simply logged to the TestReporter and the broker exits.
380func NewMockBroker(t TestReporter, brokerID int32) *MockBroker {
381 return NewMockBrokerAddr(t, brokerID, "localhost:0")
382}
383
384// NewMockBrokerAddr behaves like newMockBroker but listens on the address you give
385// it rather than just some ephemeral port.
386func NewMockBrokerAddr(t TestReporter, brokerID int32, addr string) *MockBroker {
387 listener, err := net.Listen("tcp", addr)
388 if err != nil {
389 t.Fatal(err)
390 }
391 return NewMockBrokerListener(t, brokerID, listener)
392}
393
394// NewMockBrokerListener behaves like newMockBrokerAddr but accepts connections on the listener specified.
395func NewMockBrokerListener(t TestReporter, brokerID int32, listener net.Listener) *MockBroker {
396 var err error
397
398 broker := &MockBroker{
399 closing: make(chan none),
400 stopper: make(chan none),
401 t: t,
402 brokerID: brokerID,
kesavandc71914f2022-03-25 11:19:03 +0530403 expectations: make(chan encoderWithHeader, 512),
kesavand2cde6582020-06-22 04:56:23 -0400404 listener: listener,
405 }
406 broker.handler = broker.defaultRequestHandler
407
408 Logger.Printf("*** mockbroker/%d listening on %s\n", brokerID, broker.listener.Addr().String())
409 _, portStr, err := net.SplitHostPort(broker.listener.Addr().String())
410 if err != nil {
411 t.Fatal(err)
412 }
413 tmp, err := strconv.ParseInt(portStr, 10, 32)
414 if err != nil {
415 t.Fatal(err)
416 }
417 broker.port = int32(tmp)
418
419 go broker.serverLoop()
420
421 return broker
422}
423
kesavandc71914f2022-03-25 11:19:03 +0530424func (b *MockBroker) Returns(e encoderWithHeader) {
kesavand2cde6582020-06-22 04:56:23 -0400425 b.expectations <- e
426}