[VOL-4290] Voltha go library updates for gRPC migration
Change-Id: I1aa2774beb6b7ed7419bc45aeb53fcae8a8ecda0
diff --git a/vendor/github.com/Shopify/sarama/mockbroker.go b/vendor/github.com/Shopify/sarama/mockbroker.go
index 4ed46a6..c2654d1 100644
--- a/vendor/github.com/Shopify/sarama/mockbroker.go
+++ b/vendor/github.com/Shopify/sarama/mockbroker.go
@@ -20,7 +20,7 @@
type GSSApiHandlerFunc func([]byte) []byte
-type requestHandlerFunc func(req *request) (res encoder)
+type requestHandlerFunc func(req *request) (res encoderWithHeader)
// RequestNotifierFunc is invoked when a mock broker processes a request successfully
// and will provides the number of bytes read and written.
@@ -30,7 +30,7 @@
// to facilitate testing of higher level or specialized consumers and producers
// built on top of Sarama. Note that it does not 'mimic' the Kafka API protocol,
// but rather provides a facility to do that. It takes care of the TCP
-// transport, request unmarshaling, response marshaling, and makes it the test
+// transport, request unmarshalling, response marshalling, and makes it the test
// writer responsibility to program correct according to the Kafka API protocol
// MockBroker behaviour.
//
@@ -55,7 +55,7 @@
port int32
closing chan none
stopper chan none
- expectations chan encoder
+ expectations chan encoderWithHeader
listener net.Listener
t TestReporter
latency time.Duration
@@ -83,7 +83,7 @@
// and uses the found MockResponse instance to generate an appropriate reply.
// If the request type is not found in the map then nothing is sent.
func (b *MockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) {
- b.setHandler(func(req *request) (res encoder) {
+ b.setHandler(func(req *request) (res encoderWithHeader) {
reqTypeName := reflect.TypeOf(req.body).Elem().Name()
mockResponse := handlerMap[reqTypeName]
if mockResponse == nil {
@@ -213,11 +213,13 @@
return buffer[4] == 0x60 || bytes.Equal(buffer[4:6], []byte{0x05, 0x04})
}
-func (b *MockBroker) handleRequests(conn net.Conn, idx int, wg *sync.WaitGroup) {
+func (b *MockBroker) handleRequests(conn io.ReadWriteCloser, idx int, wg *sync.WaitGroup) {
defer wg.Done()
defer func() {
_ = conn.Close()
}()
+ s := spew.NewDefaultConfig()
+ s.MaxDepth = 1
Logger.Printf("*** mockbroker/%d/%d: connection opened", b.BrokerID(), idx)
var err error
@@ -231,11 +233,9 @@
}
}()
- resHeader := make([]byte, 8)
var bytesWritten int
var bytesRead int
for {
-
buffer, err := b.readToBytes(conn)
if err != nil {
Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(buffer))
@@ -245,7 +245,6 @@
bytesWritten = 0
if !b.isGSSAPI(buffer) {
-
req, br, err := decodeRequest(bytes.NewReader(buffer))
bytesRead = br
if err != nil {
@@ -267,7 +266,12 @@
Logger.Printf("*** mockbroker/%d/%d: ignored %v", b.brokerID, idx, spew.Sdump(req))
continue
}
- Logger.Printf("*** mockbroker/%d/%d: served %v -> %v", b.brokerID, idx, req, res)
+ Logger.Printf(
+ "*** mockbroker/%d/%d: replied to %T with %T\n-> %s\n-> %s",
+ b.brokerID, idx, req.body, res,
+ s.Sprintf("%#v", req.body),
+ s.Sprintf("%#v", res),
+ )
encodedRes, err := encode(res, nil)
if err != nil {
@@ -283,8 +287,7 @@
continue
}
- binary.BigEndian.PutUint32(resHeader, uint32(len(encodedRes)+4))
- binary.BigEndian.PutUint32(resHeader[4:], uint32(req.correlationID))
+ resHeader := b.encodeHeader(res.headerVersion(), req.correlationID, uint32(len(encodedRes)))
if _, err = conn.Write(resHeader); err != nil {
b.serverError(err)
break
@@ -294,7 +297,6 @@
break
}
bytesWritten = len(resHeader) + len(encodedRes)
-
} else {
// GSSAPI is not part of kafka protocol, but is supported for authentication proposes.
// Don't support history for this kind of request as is only used for test GSSAPI authentication mechanism
@@ -317,12 +319,29 @@
b.notifier(bytesRead, bytesWritten)
}
b.lock.Unlock()
-
}
Logger.Printf("*** mockbroker/%d/%d: connection closed, err=%v", b.BrokerID(), idx, err)
}
-func (b *MockBroker) defaultRequestHandler(req *request) (res encoder) {
+func (b *MockBroker) encodeHeader(headerVersion int16, correlationId int32, payloadLength uint32) []byte {
+ headerLength := uint32(8)
+
+ if headerVersion >= 1 {
+ headerLength = 9
+ }
+
+ resHeader := make([]byte, headerLength)
+ binary.BigEndian.PutUint32(resHeader, payloadLength+headerLength-4)
+ binary.BigEndian.PutUint32(resHeader[4:], uint32(correlationId))
+
+ if headerVersion >= 1 {
+ binary.PutUvarint(resHeader[8:], 0)
+ }
+
+ return resHeader
+}
+
+func (b *MockBroker) defaultRequestHandler(req *request) (res encoderWithHeader) {
select {
case res, ok := <-b.expectations:
if !ok {
@@ -377,7 +396,7 @@
stopper: make(chan none),
t: t,
brokerID: brokerID,
- expectations: make(chan encoder, 512),
+ expectations: make(chan encoderWithHeader, 512),
listener: listener,
}
broker.handler = broker.defaultRequestHandler
@@ -398,6 +417,6 @@
return broker
}
-func (b *MockBroker) Returns(e encoder) {
+func (b *MockBroker) Returns(e encoderWithHeader) {
b.expectations <- e
}