SEBA-949 support for publishing bbsim events on kafka
Change-Id: I4354cd026bbadc801e4d6d08b2f9cd3462917b4c
diff --git a/vendor/github.com/Shopify/sarama/mockbroker.go b/vendor/github.com/Shopify/sarama/mockbroker.go
new file mode 100644
index 0000000..cd1a850
--- /dev/null
+++ b/vendor/github.com/Shopify/sarama/mockbroker.go
@@ -0,0 +1,399 @@
+package sarama
+
+import (
+ "bytes"
+ "encoding/binary"
+ "fmt"
+ "io"
+ "net"
+ "reflect"
+ "strconv"
+ "sync"
+ "time"
+
+ "github.com/davecgh/go-spew/spew"
+)
+
+const (
+ expectationTimeout = 500 * time.Millisecond
+)
+
+type GSSApiHandlerFunc func([]byte) []byte
+
+type requestHandlerFunc func(req *request) (res encoder)
+
+// RequestNotifierFunc is invoked when a mock broker processes a request successfully
+// and will provides the number of bytes read and written.
+type RequestNotifierFunc func(bytesRead, bytesWritten int)
+
+// MockBroker is a mock Kafka broker that is used in unit tests. It is exposed
+// to facilitate testing of higher level or specialized consumers and producers
+// built on top of Sarama. Note that it does not 'mimic' the Kafka API protocol,
+// but rather provides a facility to do that. It takes care of the TCP
+// transport, request unmarshaling, response marshaling, and makes it the test
+// writer responsibility to program correct according to the Kafka API protocol
+// MockBroker behaviour.
+//
+// MockBroker is implemented as a TCP server listening on a kernel-selected
+// localhost port that can accept many connections. It reads Kafka requests
+// from that connection and returns responses programmed by the SetHandlerByMap
+// function. If a MockBroker receives a request that it has no programmed
+// response for, then it returns nothing and the request times out.
+//
+// A set of MockRequest builders to define mappings used by MockBroker is
+// provided by Sarama. But users can develop MockRequests of their own and use
+// them along with or instead of the standard ones.
+//
+// When running tests with MockBroker it is strongly recommended to specify
+// a timeout to `go test` so that if the broker hangs waiting for a response,
+// the test panics.
+//
+// It is not necessary to prefix message length or correlation ID to your
+// response bytes, the server does that automatically as a convenience.
+type MockBroker struct {
+ brokerID int32
+ port int32
+ closing chan none
+ stopper chan none
+ expectations chan encoder
+ listener net.Listener
+ t TestReporter
+ latency time.Duration
+ handler requestHandlerFunc
+ notifier RequestNotifierFunc
+ history []RequestResponse
+ lock sync.Mutex
+ gssApiHandler GSSApiHandlerFunc
+}
+
+// RequestResponse represents a Request/Response pair processed by MockBroker.
+type RequestResponse struct {
+ Request protocolBody
+ Response encoder
+}
+
+// SetLatency makes broker pause for the specified period every time before
+// replying.
+func (b *MockBroker) SetLatency(latency time.Duration) {
+ b.latency = latency
+}
+
+// SetHandlerByMap defines mapping of Request types to MockResponses. When a
+// request is received by the broker, it looks up the request type in the map
+// and uses the found MockResponse instance to generate an appropriate reply.
+// If the request type is not found in the map then nothing is sent.
+func (b *MockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) {
+ b.setHandler(func(req *request) (res encoder) {
+ reqTypeName := reflect.TypeOf(req.body).Elem().Name()
+ mockResponse := handlerMap[reqTypeName]
+ if mockResponse == nil {
+ return nil
+ }
+ return mockResponse.For(req.body)
+ })
+}
+
+// SetNotifier set a function that will get invoked whenever a request has been
+// processed successfully and will provide the number of bytes read and written
+func (b *MockBroker) SetNotifier(notifier RequestNotifierFunc) {
+ b.lock.Lock()
+ b.notifier = notifier
+ b.lock.Unlock()
+}
+
+// BrokerID returns broker ID assigned to the broker.
+func (b *MockBroker) BrokerID() int32 {
+ return b.brokerID
+}
+
+// History returns a slice of RequestResponse pairs in the order they were
+// processed by the broker. Note that in case of multiple connections to the
+// broker the order expected by a test can be different from the order recorded
+// in the history, unless some synchronization is implemented in the test.
+func (b *MockBroker) History() []RequestResponse {
+ b.lock.Lock()
+ history := make([]RequestResponse, len(b.history))
+ copy(history, b.history)
+ b.lock.Unlock()
+ return history
+}
+
+// Port returns the TCP port number the broker is listening for requests on.
+func (b *MockBroker) Port() int32 {
+ return b.port
+}
+
+// Addr returns the broker connection string in the form "<address>:<port>".
+func (b *MockBroker) Addr() string {
+ return b.listener.Addr().String()
+}
+
+// Close terminates the broker blocking until it stops internal goroutines and
+// releases all resources.
+func (b *MockBroker) Close() {
+ close(b.expectations)
+ if len(b.expectations) > 0 {
+ buf := bytes.NewBufferString(fmt.Sprintf("mockbroker/%d: not all expectations were satisfied! Still waiting on:\n", b.BrokerID()))
+ for e := range b.expectations {
+ _, _ = buf.WriteString(spew.Sdump(e))
+ }
+ b.t.Error(buf.String())
+ }
+ close(b.closing)
+ <-b.stopper
+}
+
+// setHandler sets the specified function as the request handler. Whenever
+// a mock broker reads a request from the wire it passes the request to the
+// function and sends back whatever the handler function returns.
+func (b *MockBroker) setHandler(handler requestHandlerFunc) {
+ b.lock.Lock()
+ b.handler = handler
+ b.lock.Unlock()
+}
+
+func (b *MockBroker) serverLoop() {
+ defer close(b.stopper)
+ var err error
+ var conn net.Conn
+
+ go func() {
+ <-b.closing
+ err := b.listener.Close()
+ if err != nil {
+ b.t.Error(err)
+ }
+ }()
+
+ wg := &sync.WaitGroup{}
+ i := 0
+ for conn, err = b.listener.Accept(); err == nil; conn, err = b.listener.Accept() {
+ wg.Add(1)
+ go b.handleRequests(conn, i, wg)
+ i++
+ }
+ wg.Wait()
+ Logger.Printf("*** mockbroker/%d: listener closed, err=%v", b.BrokerID(), err)
+}
+
+func (b *MockBroker) SetGSSAPIHandler(handler GSSApiHandlerFunc) {
+ b.gssApiHandler = handler
+}
+
+func (b *MockBroker) readToBytes(r io.Reader) ([]byte, error) {
+ var (
+ bytesRead int
+ lengthBytes = make([]byte, 4)
+ )
+
+ if _, err := io.ReadFull(r, lengthBytes); err != nil {
+ return nil, err
+ }
+
+ bytesRead += len(lengthBytes)
+ length := int32(binary.BigEndian.Uint32(lengthBytes))
+
+ if length <= 4 || length > MaxRequestSize {
+ return nil, PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", length)}
+ }
+
+ encodedReq := make([]byte, length)
+ if _, err := io.ReadFull(r, encodedReq); err != nil {
+ return nil, err
+ }
+
+ bytesRead += len(encodedReq)
+
+ fullBytes := append(lengthBytes, encodedReq...)
+
+ return fullBytes, nil
+}
+
+func (b *MockBroker) isGSSAPI(buffer []byte) bool {
+ return buffer[4] == 0x60 || bytes.Equal(buffer[4:6], []byte{0x05, 0x04})
+}
+
+func (b *MockBroker) handleRequests(conn net.Conn, idx int, wg *sync.WaitGroup) {
+ defer wg.Done()
+ defer func() {
+ _ = conn.Close()
+ }()
+ Logger.Printf("*** mockbroker/%d/%d: connection opened", b.BrokerID(), idx)
+ var err error
+
+ abort := make(chan none)
+ defer close(abort)
+ go func() {
+ select {
+ case <-b.closing:
+ _ = conn.Close()
+ case <-abort:
+ }
+ }()
+
+ resHeader := make([]byte, 8)
+ var bytesWritten int
+ var bytesRead int
+ for {
+ buffer, err := b.readToBytes(conn)
+ if err != nil {
+ Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(buffer))
+ b.serverError(err)
+ break
+ }
+
+ bytesWritten = 0
+ if !b.isGSSAPI(buffer) {
+ req, br, err := decodeRequest(bytes.NewReader(buffer))
+ bytesRead = br
+ if err != nil {
+ Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(req))
+ b.serverError(err)
+ break
+ }
+
+ if b.latency > 0 {
+ time.Sleep(b.latency)
+ }
+
+ b.lock.Lock()
+ res := b.handler(req)
+ b.history = append(b.history, RequestResponse{req.body, res})
+ b.lock.Unlock()
+
+ if res == nil {
+ Logger.Printf("*** mockbroker/%d/%d: ignored %v", b.brokerID, idx, spew.Sdump(req))
+ continue
+ }
+ Logger.Printf("*** mockbroker/%d/%d: served %v -> %v", b.brokerID, idx, req, res)
+
+ encodedRes, err := encode(res, nil)
+ if err != nil {
+ b.serverError(err)
+ break
+ }
+ if len(encodedRes) == 0 {
+ b.lock.Lock()
+ if b.notifier != nil {
+ b.notifier(bytesRead, 0)
+ }
+ b.lock.Unlock()
+ continue
+ }
+
+ binary.BigEndian.PutUint32(resHeader, uint32(len(encodedRes)+4))
+ binary.BigEndian.PutUint32(resHeader[4:], uint32(req.correlationID))
+ if _, err = conn.Write(resHeader); err != nil {
+ b.serverError(err)
+ break
+ }
+ if _, err = conn.Write(encodedRes); err != nil {
+ b.serverError(err)
+ break
+ }
+ bytesWritten = len(resHeader) + len(encodedRes)
+ } else {
+ // GSSAPI is not part of kafka protocol, but is supported for authentication proposes.
+ // Don't support history for this kind of request as is only used for test GSSAPI authentication mechanism
+ b.lock.Lock()
+ res := b.gssApiHandler(buffer)
+ b.lock.Unlock()
+ if res == nil {
+ Logger.Printf("*** mockbroker/%d/%d: ignored %v", b.brokerID, idx, spew.Sdump(buffer))
+ continue
+ }
+ if _, err = conn.Write(res); err != nil {
+ b.serverError(err)
+ break
+ }
+ bytesWritten = len(res)
+ }
+
+ b.lock.Lock()
+ if b.notifier != nil {
+ b.notifier(bytesRead, bytesWritten)
+ }
+ b.lock.Unlock()
+ }
+ Logger.Printf("*** mockbroker/%d/%d: connection closed, err=%v", b.BrokerID(), idx, err)
+}
+
+func (b *MockBroker) defaultRequestHandler(req *request) (res encoder) {
+ select {
+ case res, ok := <-b.expectations:
+ if !ok {
+ return nil
+ }
+ return res
+ case <-time.After(expectationTimeout):
+ return nil
+ }
+}
+
+func (b *MockBroker) serverError(err error) {
+ isConnectionClosedError := false
+ if _, ok := err.(*net.OpError); ok {
+ isConnectionClosedError = true
+ } else if err == io.EOF {
+ isConnectionClosedError = true
+ } else if err.Error() == "use of closed network connection" {
+ isConnectionClosedError = true
+ }
+
+ if isConnectionClosedError {
+ return
+ }
+
+ b.t.Errorf(err.Error())
+}
+
+// NewMockBroker launches a fake Kafka broker. It takes a TestReporter as provided by the
+// test framework and a channel of responses to use. If an error occurs it is
+// simply logged to the TestReporter and the broker exits.
+func NewMockBroker(t TestReporter, brokerID int32) *MockBroker {
+ return NewMockBrokerAddr(t, brokerID, "localhost:0")
+}
+
+// NewMockBrokerAddr behaves like newMockBroker but listens on the address you give
+// it rather than just some ephemeral port.
+func NewMockBrokerAddr(t TestReporter, brokerID int32, addr string) *MockBroker {
+ listener, err := net.Listen("tcp", addr)
+ if err != nil {
+ t.Fatal(err)
+ }
+ return NewMockBrokerListener(t, brokerID, listener)
+}
+
+// NewMockBrokerListener behaves like newMockBrokerAddr but accepts connections on the listener specified.
+func NewMockBrokerListener(t TestReporter, brokerID int32, listener net.Listener) *MockBroker {
+ var err error
+
+ broker := &MockBroker{
+ closing: make(chan none),
+ stopper: make(chan none),
+ t: t,
+ brokerID: brokerID,
+ expectations: make(chan encoder, 512),
+ listener: listener,
+ }
+ broker.handler = broker.defaultRequestHandler
+
+ Logger.Printf("*** mockbroker/%d listening on %s\n", brokerID, broker.listener.Addr().String())
+ _, portStr, err := net.SplitHostPort(broker.listener.Addr().String())
+ if err != nil {
+ t.Fatal(err)
+ }
+ tmp, err := strconv.ParseInt(portStr, 10, 32)
+ if err != nil {
+ t.Fatal(err)
+ }
+ broker.port = int32(tmp)
+
+ go broker.serverLoop()
+
+ return broker
+}
+
+func (b *MockBroker) Returns(e encoder) {
+ b.expectations <- e
+}