VOL-1921 - updated to use go mod

Change-Id: I8d5187fa91fa619494f972bc29d3bd61e5be3a82
diff --git a/vendor/github.com/Shopify/sarama/broker.go b/vendor/github.com/Shopify/sarama/broker.go
index 7b32a03..9c3e5a0 100644
--- a/vendor/github.com/Shopify/sarama/broker.go
+++ b/vendor/github.com/Shopify/sarama/broker.go
@@ -1013,7 +1013,7 @@
 
 	b.correlationID++
 
-	bytesRead, err := b.receiveSASLServerResponse(correlationID)
+	bytesRead, err := b.receiveSASLServerResponse(&SaslAuthenticateResponse{}, correlationID)
 	b.updateIncomingCommunicationMetrics(bytesRead, time.Since(requestTime))
 
 	// With v1 sasl we get an error message set in the response we can return
@@ -1037,26 +1037,53 @@
 		return err
 	}
 
+	message, err := buildClientFirstMessage(token)
+	if err != nil {
+		return err
+	}
+
+	challenged, err := b.sendClientMessage(message)
+	if err != nil {
+		return err
+	}
+
+	if challenged {
+		// Abort the token exchange. The broker returns the failure code.
+		_, err = b.sendClientMessage([]byte(`\x01`))
+	}
+
+	return err
+}
+
+// sendClientMessage sends a SASL/OAUTHBEARER client message and returns true
+// if the broker responds with a challenge, in which case the token is
+// rejected.
+func (b *Broker) sendClientMessage(message []byte) (bool, error) {
+
 	requestTime := time.Now()
 	correlationID := b.correlationID
 
-	bytesWritten, err := b.sendSASLOAuthBearerClientResponse(token, correlationID)
+	bytesWritten, err := b.sendSASLOAuthBearerClientMessage(message, correlationID)
 	if err != nil {
-		return err
+		return false, err
 	}
 
 	b.updateOutgoingCommunicationMetrics(bytesWritten)
 	b.correlationID++
 
-	bytesRead, err := b.receiveSASLServerResponse(correlationID)
-	if err != nil {
-		return err
-	}
+	res := &SaslAuthenticateResponse{}
+	bytesRead, err := b.receiveSASLServerResponse(res, correlationID)
 
 	requestLatency := time.Since(requestTime)
 	b.updateIncomingCommunicationMetrics(bytesRead, requestLatency)
 
-	return nil
+	isChallenge := len(res.SaslAuthBytes) > 0
+
+	if isChallenge && err != nil {
+		Logger.Printf("Broker rejected authentication token: %s", res.SaslAuthBytes)
+	}
+
+	return isChallenge, err
 }
 
 func (b *Broker) sendAndReceiveSASLSCRAMv1() error {
@@ -1154,7 +1181,7 @@
 
 // Build SASL/OAUTHBEARER initial client response as described by RFC-7628
 // https://tools.ietf.org/html/rfc7628
-func buildClientInitialResponse(token *AccessToken) ([]byte, error) {
+func buildClientFirstMessage(token *AccessToken) ([]byte, error) {
 	var ext string
 
 	if token.Extensions != nil && len(token.Extensions) > 0 {
@@ -1200,11 +1227,7 @@
 	return b.conn.Write(buf)
 }
 
-func (b *Broker) sendSASLOAuthBearerClientResponse(token *AccessToken, correlationID int32) (int, error) {
-	initialResp, err := buildClientInitialResponse(token)
-	if err != nil {
-		return 0, err
-	}
+func (b *Broker) sendSASLOAuthBearerClientMessage(initialResp []byte, correlationID int32) (int, error) {
 
 	rb := &SaslAuthenticateRequest{initialResp}
 
@@ -1222,7 +1245,7 @@
 	return b.conn.Write(buf)
 }
 
-func (b *Broker) receiveSASLServerResponse(correlationID int32) (int, error) {
+func (b *Broker) receiveSASLServerResponse(res *SaslAuthenticateResponse, correlationID int32) (int, error) {
 
 	buf := make([]byte, responseLengthSize+correlationIDSize)
 
@@ -1250,8 +1273,6 @@
 		return bytesRead, err
 	}
 
-	res := &SaslAuthenticateResponse{}
-
 	if err := versionedDecode(buf, res, 0); err != nil {
 		return bytesRead, err
 	}
@@ -1260,10 +1281,6 @@
 		return bytesRead, res.Err
 	}
 
-	if len(res.SaslAuthBytes) > 0 {
-		Logger.Printf("Received SASL auth response: %s", res.SaslAuthBytes)
-	}
-
 	return bytesRead, nil
 }