VOL-1921 - updated to use go mod

Change-Id: I8d5187fa91fa619494f972bc29d3bd61e5be3a82
diff --git a/vendor/github.com/Shopify/sarama/CHANGELOG.md b/vendor/github.com/Shopify/sarama/CHANGELOG.md
index 4f89e21..02bd0ff 100644
--- a/vendor/github.com/Shopify/sarama/CHANGELOG.md
+++ b/vendor/github.com/Shopify/sarama/CHANGELOG.md
@@ -1,5 +1,13 @@
 # Changelog
 
+#### Version 1.23.1 (2019-07-22)
+
+Bug Fixes:
+- Fix fetch delete bug record
+  ([1425](https://github.com/Shopify/sarama/pull/1425)).
+- Handle SASL/OAUTHBEARER token rejection
+  ([1428](https://github.com/Shopify/sarama/pull/1428)).
+
 #### Version 1.23.0 (2019-07-02)
 
 New Features:
diff --git a/vendor/github.com/Shopify/sarama/admin.go b/vendor/github.com/Shopify/sarama/admin.go
index a4d1bc5..1db6a0e 100644
--- a/vendor/github.com/Shopify/sarama/admin.go
+++ b/vendor/github.com/Shopify/sarama/admin.go
@@ -374,29 +374,50 @@
 	if topic == "" {
 		return ErrInvalidTopic
 	}
-
-	topics := make(map[string]*DeleteRecordsRequestTopic)
-	topics[topic] = &DeleteRecordsRequestTopic{PartitionOffsets: partitionOffsets}
-	request := &DeleteRecordsRequest{
-		Topics:  topics,
-		Timeout: ca.conf.Admin.Timeout,
+	partitionPerBroker := make(map[*Broker][]int32)
+	for partition := range partitionOffsets {
+		broker, err := ca.client.Leader(topic, partition)
+		if err != nil {
+			return err
+		}
+		if _, ok := partitionPerBroker[broker]; ok {
+			partitionPerBroker[broker] = append(partitionPerBroker[broker], partition)
+		} else {
+			partitionPerBroker[broker] = []int32{partition}
+		}
 	}
+	errs := make([]error, 0)
+	for broker, partitions := range partitionPerBroker {
+		topics := make(map[string]*DeleteRecordsRequestTopic)
+		recordsToDelete := make(map[int32]int64)
+		for _, p := range partitions {
+			recordsToDelete[p] = partitionOffsets[p]
+		}
+		topics[topic] = &DeleteRecordsRequestTopic{PartitionOffsets: recordsToDelete}
+		request := &DeleteRecordsRequest{
+			Topics:  topics,
+			Timeout: ca.conf.Admin.Timeout,
+		}
 
-	b, err := ca.Controller()
-	if err != nil {
-		return err
+		rsp, err := broker.DeleteRecords(request)
+		if err != nil {
+			errs = append(errs, err)
+		} else {
+			deleteRecordsResponseTopic, ok := rsp.Topics[topic]
+			if !ok {
+				errs = append(errs, ErrIncompleteResponse)
+			} else {
+				for _, deleteRecordsResponsePartition := range deleteRecordsResponseTopic.Partitions {
+					if deleteRecordsResponsePartition.Err != ErrNoError {
+						errs = append(errs, errors.New(deleteRecordsResponsePartition.Err.Error()))
+					}
+				}
+			}
+		}
 	}
-
-	rsp, err := b.DeleteRecords(request)
-	if err != nil {
-		return err
+	if len(errs) > 0 {
+		return ErrDeleteRecords{MultiError{&errs}}
 	}
-
-	_, ok := rsp.Topics[topic]
-	if !ok {
-		return ErrIncompleteResponse
-	}
-
 	//todo since we are dealing with couple of partitions it would be good if we return slice of errors
 	//for each partition instead of one error
 	return nil
diff --git a/vendor/github.com/Shopify/sarama/broker.go b/vendor/github.com/Shopify/sarama/broker.go
index 7b32a03..9c3e5a0 100644
--- a/vendor/github.com/Shopify/sarama/broker.go
+++ b/vendor/github.com/Shopify/sarama/broker.go
@@ -1013,7 +1013,7 @@
 
 	b.correlationID++
 
-	bytesRead, err := b.receiveSASLServerResponse(correlationID)
+	bytesRead, err := b.receiveSASLServerResponse(&SaslAuthenticateResponse{}, correlationID)
 	b.updateIncomingCommunicationMetrics(bytesRead, time.Since(requestTime))
 
 	// With v1 sasl we get an error message set in the response we can return
@@ -1037,26 +1037,53 @@
 		return err
 	}
 
+	message, err := buildClientFirstMessage(token)
+	if err != nil {
+		return err
+	}
+
+	challenged, err := b.sendClientMessage(message)
+	if err != nil {
+		return err
+	}
+
+	if challenged {
+		// Abort the token exchange. The broker returns the failure code.
+		_, err = b.sendClientMessage([]byte(`\x01`))
+	}
+
+	return err
+}
+
+// sendClientMessage sends a SASL/OAUTHBEARER client message and returns true
+// if the broker responds with a challenge, in which case the token is
+// rejected.
+func (b *Broker) sendClientMessage(message []byte) (bool, error) {
+
 	requestTime := time.Now()
 	correlationID := b.correlationID
 
-	bytesWritten, err := b.sendSASLOAuthBearerClientResponse(token, correlationID)
+	bytesWritten, err := b.sendSASLOAuthBearerClientMessage(message, correlationID)
 	if err != nil {
-		return err
+		return false, err
 	}
 
 	b.updateOutgoingCommunicationMetrics(bytesWritten)
 	b.correlationID++
 
-	bytesRead, err := b.receiveSASLServerResponse(correlationID)
-	if err != nil {
-		return err
-	}
+	res := &SaslAuthenticateResponse{}
+	bytesRead, err := b.receiveSASLServerResponse(res, correlationID)
 
 	requestLatency := time.Since(requestTime)
 	b.updateIncomingCommunicationMetrics(bytesRead, requestLatency)
 
-	return nil
+	isChallenge := len(res.SaslAuthBytes) > 0
+
+	if isChallenge && err != nil {
+		Logger.Printf("Broker rejected authentication token: %s", res.SaslAuthBytes)
+	}
+
+	return isChallenge, err
 }
 
 func (b *Broker) sendAndReceiveSASLSCRAMv1() error {
@@ -1154,7 +1181,7 @@
 
 // Build SASL/OAUTHBEARER initial client response as described by RFC-7628
 // https://tools.ietf.org/html/rfc7628
-func buildClientInitialResponse(token *AccessToken) ([]byte, error) {
+func buildClientFirstMessage(token *AccessToken) ([]byte, error) {
 	var ext string
 
 	if token.Extensions != nil && len(token.Extensions) > 0 {
@@ -1200,11 +1227,7 @@
 	return b.conn.Write(buf)
 }
 
-func (b *Broker) sendSASLOAuthBearerClientResponse(token *AccessToken, correlationID int32) (int, error) {
-	initialResp, err := buildClientInitialResponse(token)
-	if err != nil {
-		return 0, err
-	}
+func (b *Broker) sendSASLOAuthBearerClientMessage(initialResp []byte, correlationID int32) (int, error) {
 
 	rb := &SaslAuthenticateRequest{initialResp}
 
@@ -1222,7 +1245,7 @@
 	return b.conn.Write(buf)
 }
 
-func (b *Broker) receiveSASLServerResponse(correlationID int32) (int, error) {
+func (b *Broker) receiveSASLServerResponse(res *SaslAuthenticateResponse, correlationID int32) (int, error) {
 
 	buf := make([]byte, responseLengthSize+correlationIDSize)
 
@@ -1250,8 +1273,6 @@
 		return bytesRead, err
 	}
 
-	res := &SaslAuthenticateResponse{}
-
 	if err := versionedDecode(buf, res, 0); err != nil {
 		return bytesRead, err
 	}
@@ -1260,10 +1281,6 @@
 		return bytesRead, res.Err
 	}
 
-	if len(res.SaslAuthBytes) > 0 {
-		Logger.Printf("Received SASL auth response: %s", res.SaslAuthBytes)
-	}
-
 	return bytesRead, nil
 }
 
diff --git a/vendor/github.com/Shopify/sarama/errors.go b/vendor/github.com/Shopify/sarama/errors.go
index 8ecb652..c6a8be7 100644
--- a/vendor/github.com/Shopify/sarama/errors.go
+++ b/vendor/github.com/Shopify/sarama/errors.go
@@ -81,6 +81,28 @@
 // See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes
 type KError int16
 
+// MultiError is used to contain multi error.
+type MultiError struct {
+	Errors *[]error
+}
+
+func (mErr MultiError) Error() string {
+	var errString = ""
+	for _, err := range *mErr.Errors {
+		errString += err.Error() + ","
+	}
+	return errString
+}
+
+// ErrDeleteRecords is the type of error returned when fail to delete the required records
+type ErrDeleteRecords struct {
+	MultiError
+}
+
+func (err ErrDeleteRecords) Error() string {
+	return "kafka server: failed to delete records " + err.MultiError.Error()
+}
+
 // Numeric error codes returned by the Kafka server.
 const (
 	ErrNoError                            KError = 0