[VOL-4291] Rw-core updates for gRPC migration

Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/vendor/github.com/Shopify/sarama/client.go b/vendor/github.com/Shopify/sarama/client.go
index c4c54b2..c0918ba 100644
--- a/vendor/github.com/Shopify/sarama/client.go
+++ b/vendor/github.com/Shopify/sarama/client.go
@@ -17,12 +17,21 @@
 	// altered after it has been created.
 	Config() *Config
 
-	// Controller returns the cluster controller broker. Requires Kafka 0.10 or higher.
+	// Controller returns the cluster controller broker. It will return a
+	// locally cached value if it's available. You can call RefreshController
+	// to update the cached value. Requires Kafka 0.10 or higher.
 	Controller() (*Broker, error)
 
+	// RefreshController retrieves the cluster controller from fresh metadata
+	// and stores it in the local cache. Requires Kafka 0.10 or higher.
+	RefreshController() (*Broker, error)
+
 	// Brokers returns the current set of active brokers as retrieved from cluster metadata.
 	Brokers() []*Broker
 
+	// Broker returns the active Broker if available for the broker ID.
+	Broker(brokerID int32) (*Broker, error)
+
 	// Topics returns the set of available topics as retrieved from cluster metadata.
 	Topics() ([]string, error)
 
@@ -50,6 +59,11 @@
 	// partition. Offline replicas are replicas which are offline
 	OfflineReplicas(topic string, partitionID int32) ([]int32, error)
 
+	// RefreshBrokers takes a list of addresses to be used as seed brokers.
+	// Existing broker connections are closed and the updated list of seed brokers
+	// will be used for the next metadata fetch.
+	RefreshBrokers(addrs []string) error
+
 	// RefreshMetadata takes a list of topics and queries the cluster to refresh the
 	// available metadata for those topics. If no topics are provided, it will refresh
 	// metadata for all topics.
@@ -149,10 +163,7 @@
 		coordinators:            make(map[string]int32),
 	}
 
-	random := rand.New(rand.NewSource(time.Now().UnixNano()))
-	for _, index := range random.Perm(len(addrs)) {
-		client.seedBrokers = append(client.seedBrokers, NewBroker(addrs[index]))
-	}
+	client.randomizeSeedBrokers(addrs)
 
 	if conf.Metadata.Full {
 		// do an initial fetch of all cluster metadata by specifying an empty list of topics
@@ -190,10 +201,20 @@
 	return brokers
 }
 
+func (client *client) Broker(brokerID int32) (*Broker, error) {
+	client.lock.RLock()
+	defer client.lock.RUnlock()
+	broker, ok := client.brokers[brokerID]
+	if !ok {
+		return nil, ErrBrokerNotFound
+	}
+	_ = broker.Open(client.conf)
+	return broker, nil
+}
+
 func (client *client) InitProducerID() (*InitProducerIDResponse, error) {
 	var err error
 	for broker := client.any(); broker != nil; broker = client.any() {
-
 		req := &InitProducerIDRequest{}
 
 		response, err := broker.InitProducerID(req)
@@ -242,6 +263,9 @@
 }
 
 func (client *client) Closed() bool {
+	client.lock.RLock()
+	defer client.lock.RUnlock()
+
 	return client.brokers == nil
 }
 
@@ -421,6 +445,27 @@
 	return leader, err
 }
 
+func (client *client) RefreshBrokers(addrs []string) error {
+	if client.Closed() {
+		return ErrClosedClient
+	}
+
+	client.lock.Lock()
+	defer client.lock.Unlock()
+
+	for _, broker := range client.brokers {
+		_ = broker.Close()
+		delete(client.brokers, broker.ID())
+	}
+
+	client.seedBrokers = nil
+	client.deadSeeds = nil
+
+	client.randomizeSeedBrokers(addrs)
+
+	return nil
+}
+
 func (client *client) RefreshMetadata(topics ...string) error {
 	if client.Closed() {
 		return ErrClosedClient
@@ -430,7 +475,7 @@
 	// error. This handles the case by returning an error instead of sending it
 	// off to Kafka. See: https://github.com/Shopify/sarama/pull/38#issuecomment-26362310
 	for _, topic := range topics {
-		if len(topic) == 0 {
+		if topic == "" {
 			return ErrInvalidTopic // this is the error that 0.8.2 and later correctly return
 		}
 	}
@@ -448,7 +493,6 @@
 	}
 
 	offset, err := client.getOffset(topic, partitionID, time)
-
 	if err != nil {
 		if err := client.RefreshMetadata(topic); err != nil {
 			return -1, err
@@ -484,6 +528,35 @@
 	return controller, nil
 }
 
+// deregisterController removes the cached controllerID
+func (client *client) deregisterController() {
+	client.lock.Lock()
+	defer client.lock.Unlock()
+	delete(client.brokers, client.controllerID)
+}
+
+// RefreshController retrieves the cluster controller from fresh metadata
+// and stores it in the local cache. Requires Kafka 0.10 or higher.
+func (client *client) RefreshController() (*Broker, error) {
+	if client.Closed() {
+		return nil, ErrClosedClient
+	}
+
+	client.deregisterController()
+
+	if err := client.refreshMetadata(); err != nil {
+		return nil, err
+	}
+
+	controller := client.cachedController()
+	if controller == nil {
+		return nil, ErrControllerNotAvailable
+	}
+
+	_ = controller.Open(client.conf)
+	return controller, nil
+}
+
 func (client *client) Coordinator(consumerGroup string) (*Broker, error) {
 	if client.Closed() {
 		return nil, ErrClosedClient
@@ -525,10 +598,46 @@
 
 // private broker management helpers
 
+func (client *client) randomizeSeedBrokers(addrs []string) {
+	random := rand.New(rand.NewSource(time.Now().UnixNano()))
+	for _, index := range random.Perm(len(addrs)) {
+		client.seedBrokers = append(client.seedBrokers, NewBroker(addrs[index]))
+	}
+}
+
+func (client *client) updateBroker(brokers []*Broker) {
+	currentBroker := make(map[int32]*Broker, len(brokers))
+
+	for _, broker := range brokers {
+		currentBroker[broker.ID()] = broker
+		if client.brokers[broker.ID()] == nil { // add new broker
+			client.brokers[broker.ID()] = broker
+			Logger.Printf("client/brokers registered new broker #%d at %s", broker.ID(), broker.Addr())
+		} else if broker.Addr() != client.brokers[broker.ID()].Addr() { // replace broker with new address
+			safeAsyncClose(client.brokers[broker.ID()])
+			client.brokers[broker.ID()] = broker
+			Logger.Printf("client/brokers replaced registered broker #%d with %s", broker.ID(), broker.Addr())
+		}
+	}
+
+	for id, broker := range client.brokers {
+		if _, exist := currentBroker[id]; !exist { // remove old broker
+			safeAsyncClose(broker)
+			delete(client.brokers, id)
+			Logger.Printf("client/broker remove invalid broker #%d with %s", broker.ID(), broker.Addr())
+		}
+	}
+}
+
 // registerBroker makes sure a broker received by a Metadata or Coordinator request is registered
 // in the brokers map. It returns the broker that is registered, which may be the provided broker,
 // or a previously registered Broker instance. You must hold the write lock before calling this function.
 func (client *client) registerBroker(broker *Broker) {
+	if client.brokers == nil {
+		Logger.Printf("cannot register broker #%d at %s, client already closed", broker.ID(), broker.Addr())
+		return
+	}
+
 	if client.brokers[broker.ID()] == nil {
 		client.brokers[broker.ID()] = broker
 		Logger.Printf("client/brokers registered new broker #%d at %s", broker.ID(), broker.Addr())
@@ -722,7 +831,7 @@
 }
 
 func (client *client) refreshMetadata() error {
-	topics := []string{}
+	var topics []string
 
 	if !client.conf.Metadata.Full {
 		if specificTopics, err := client.MetadataTopics(); err != nil {
@@ -756,7 +865,7 @@
 				Logger.Println("client/metadata skipping last retries as we would go past the metadata timeout")
 				return err
 			}
-			Logger.Printf("client/metadata retrying after %dms... (%d attempts remaining)\n", client.conf.Metadata.Retry.Backoff/time.Millisecond, attemptsRemaining)
+			Logger.Printf("client/metadata retrying after %dms... (%d attempts remaining)\n", backoff/time.Millisecond, attemptsRemaining)
 			if backoff > 0 {
 				time.Sleep(backoff)
 			}
@@ -782,7 +891,7 @@
 			req.Version = 1
 		}
 		response, err := broker.GetMetadata(req)
-		switch err.(type) {
+		switch err := err.(type) {
 		case nil:
 			allKnownMetaData := len(topics) == 0
 			// valid response, use it
@@ -799,10 +908,15 @@
 
 		case KError:
 			// if SASL auth error return as this _should_ be a non retryable err for all brokers
-			if err.(KError) == ErrSASLAuthenticationFailed {
+			if err == ErrSASLAuthenticationFailed {
 				Logger.Println("client/metadata failed SASL authentication")
 				return err
 			}
+
+			if err == ErrTopicAuthorizationFailed {
+				Logger.Println("client is not authorized to access this topic. The topics were: ", topics)
+				return err
+			}
 			// else remove that broker and try again
 			Logger.Printf("client/metadata got error from broker %d while fetching metadata: %v\n", broker.ID(), err)
 			_ = broker.Close()
@@ -817,7 +931,7 @@
 	}
 
 	if broker != nil {
-		Logger.Println("client/metadata not fetching metadata from broker %s as we would go past the metadata timeout\n", broker.addr)
+		Logger.Printf("client/metadata not fetching metadata from broker %s as we would go past the metadata timeout\n", broker.addr)
 		return retry(ErrOutOfBrokers)
 	}
 
@@ -828,16 +942,19 @@
 
 // if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable
 func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bool) (retry bool, err error) {
+	if client.Closed() {
+		return
+	}
+
 	client.lock.Lock()
 	defer client.lock.Unlock()
 
 	// For all the brokers we received:
 	// - if it is a new ID, save it
 	// - if it is an existing ID, but the address we have is stale, discard the old one and save it
+	// - if some brokers is not exist in it, remove old broker
 	// - otherwise ignore it, replacing our existing one would just bounce the connection
-	for _, broker := range data.Brokers {
-		client.registerBroker(broker)
-	}
+	client.updateBroker(data.Brokers)
 
 	client.controllerID = data.ControllerID
 
@@ -935,7 +1052,6 @@
 		request.CoordinatorType = CoordinatorGroup
 
 		response, err := broker.FindCoordinator(request)
-
 		if err != nil {
 			Logger.Printf("client/coordinator request to broker %s failed: %s\n", broker.Addr(), err)
 
@@ -966,6 +1082,10 @@
 			}
 
 			return retry(ErrConsumerCoordinatorNotAvailable)
+		case ErrGroupAuthorizationFailed:
+			Logger.Printf("client was not authorized to access group %s while attempting to find coordinator", consumerGroup)
+			return retry(ErrGroupAuthorizationFailed)
+
 		default:
 			return nil, response.Err
 		}