VOL-1921 - updated to use go mod
Change-Id: I8d5187fa91fa619494f972bc29d3bd61e5be3a82
diff --git a/vendor/github.com/Shopify/sarama/broker.go b/vendor/github.com/Shopify/sarama/broker.go
index 7b32a03..9c3e5a0 100644
--- a/vendor/github.com/Shopify/sarama/broker.go
+++ b/vendor/github.com/Shopify/sarama/broker.go
@@ -1013,7 +1013,7 @@
b.correlationID++
- bytesRead, err := b.receiveSASLServerResponse(correlationID)
+ bytesRead, err := b.receiveSASLServerResponse(&SaslAuthenticateResponse{}, correlationID)
b.updateIncomingCommunicationMetrics(bytesRead, time.Since(requestTime))
// With v1 sasl we get an error message set in the response we can return
@@ -1037,26 +1037,53 @@
return err
}
+ message, err := buildClientFirstMessage(token)
+ if err != nil {
+ return err
+ }
+
+ challenged, err := b.sendClientMessage(message)
+ if err != nil {
+ return err
+ }
+
+ if challenged {
+ // Abort the token exchange. The broker returns the failure code.
+ _, err = b.sendClientMessage([]byte(`\x01`))
+ }
+
+ return err
+}
+
+// sendClientMessage sends a SASL/OAUTHBEARER client message and returns true
+// if the broker responds with a challenge, in which case the token is
+// rejected.
+func (b *Broker) sendClientMessage(message []byte) (bool, error) {
+
requestTime := time.Now()
correlationID := b.correlationID
- bytesWritten, err := b.sendSASLOAuthBearerClientResponse(token, correlationID)
+ bytesWritten, err := b.sendSASLOAuthBearerClientMessage(message, correlationID)
if err != nil {
- return err
+ return false, err
}
b.updateOutgoingCommunicationMetrics(bytesWritten)
b.correlationID++
- bytesRead, err := b.receiveSASLServerResponse(correlationID)
- if err != nil {
- return err
- }
+ res := &SaslAuthenticateResponse{}
+ bytesRead, err := b.receiveSASLServerResponse(res, correlationID)
requestLatency := time.Since(requestTime)
b.updateIncomingCommunicationMetrics(bytesRead, requestLatency)
- return nil
+ isChallenge := len(res.SaslAuthBytes) > 0
+
+ if isChallenge && err != nil {
+ Logger.Printf("Broker rejected authentication token: %s", res.SaslAuthBytes)
+ }
+
+ return isChallenge, err
}
func (b *Broker) sendAndReceiveSASLSCRAMv1() error {
@@ -1154,7 +1181,7 @@
// Build SASL/OAUTHBEARER initial client response as described by RFC-7628
// https://tools.ietf.org/html/rfc7628
-func buildClientInitialResponse(token *AccessToken) ([]byte, error) {
+func buildClientFirstMessage(token *AccessToken) ([]byte, error) {
var ext string
if token.Extensions != nil && len(token.Extensions) > 0 {
@@ -1200,11 +1227,7 @@
return b.conn.Write(buf)
}
-func (b *Broker) sendSASLOAuthBearerClientResponse(token *AccessToken, correlationID int32) (int, error) {
- initialResp, err := buildClientInitialResponse(token)
- if err != nil {
- return 0, err
- }
+func (b *Broker) sendSASLOAuthBearerClientMessage(initialResp []byte, correlationID int32) (int, error) {
rb := &SaslAuthenticateRequest{initialResp}
@@ -1222,7 +1245,7 @@
return b.conn.Write(buf)
}
-func (b *Broker) receiveSASLServerResponse(correlationID int32) (int, error) {
+func (b *Broker) receiveSASLServerResponse(res *SaslAuthenticateResponse, correlationID int32) (int, error) {
buf := make([]byte, responseLengthSize+correlationIDSize)
@@ -1250,8 +1273,6 @@
return bytesRead, err
}
- res := &SaslAuthenticateResponse{}
-
if err := versionedDecode(buf, res, 0); err != nil {
return bytesRead, err
}
@@ -1260,10 +1281,6 @@
return bytesRead, res.Err
}
- if len(res.SaslAuthBytes) > 0 {
- Logger.Printf("Received SASL auth response: %s", res.SaslAuthBytes)
- }
-
return bytesRead, nil
}