VOL-1967 move api-server to separate repository

Change-Id: I21b85be74205805be15f8a85e53a903d16785671
diff --git a/vendor/github.com/Shopify/sarama/consumer_group.go b/vendor/github.com/Shopify/sarama/consumer_group.go
index 8c8babc..8de9513 100644
--- a/vendor/github.com/Shopify/sarama/consumer_group.go
+++ b/vendor/github.com/Shopify/sarama/consumer_group.go
@@ -52,8 +52,7 @@
 }
 
 type consumerGroup struct {
-	client    Client
-	ownClient bool
+	client Client
 
 	config   *Config
 	consumer Consumer
@@ -73,20 +72,24 @@
 		return nil, err
 	}
 
-	c, err := NewConsumerGroupFromClient(groupID, client)
+	c, err := newConsumerGroup(groupID, client)
 	if err != nil {
 		_ = client.Close()
-		return nil, err
 	}
-
-	c.(*consumerGroup).ownClient = true
-	return c, nil
+	return c, err
 }
 
 // NewConsumerGroupFromClient creates a new consumer group using the given client. It is still
 // necessary to call Close() on the underlying client when shutting down this consumer.
 // PLEASE NOTE: consumer groups can only re-use but not share clients.
 func NewConsumerGroupFromClient(groupID string, client Client) (ConsumerGroup, error) {
+	// For clients passed in by the client, ensure we don't
+	// call Close() on it.
+	cli := &nopCloserClient{client}
+	return newConsumerGroup(groupID, cli)
+}
+
+func newConsumerGroup(groupID string, client Client) (ConsumerGroup, error) {
 	config := client.Config()
 	if !config.Version.IsAtLeast(V0_10_2_0) {
 		return nil, ConfigurationError("consumer groups require Version to be >= V0_10_2_0")
@@ -131,10 +134,8 @@
 			err = e
 		}
 
-		if c.ownClient {
-			if e := c.client.Close(); e != nil {
-				err = e
-			}
+		if e := c.client.Close(); e != nil {
+			err = e
 		}
 	})
 	return
@@ -162,14 +163,8 @@
 		return err
 	}
 
-	// Get coordinator
-	coordinator, err := c.client.Coordinator(c.groupID)
-	if err != nil {
-		return err
-	}
-
 	// Init session
-	sess, err := c.newSession(ctx, coordinator, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)
+	sess, err := c.newSession(ctx, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)
 	if err == ErrClosedClient {
 		return ErrClosedConsumerGroup
 	} else if err != nil {
@@ -183,7 +178,33 @@
 	return sess.release(true)
 }
 
-func (c *consumerGroup) newSession(ctx context.Context, coordinator *Broker, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {
+func (c *consumerGroup) retryNewSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int, refreshCoordinator bool) (*consumerGroupSession, error) {
+	select {
+	case <-c.closed:
+		return nil, ErrClosedConsumerGroup
+	case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff):
+	}
+
+	if refreshCoordinator {
+		err := c.client.RefreshCoordinator(c.groupID)
+		if err != nil {
+			return c.retryNewSession(ctx, topics, handler, retries, true)
+		}
+	}
+
+	return c.newSession(ctx, topics, handler, retries-1)
+}
+
+func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {
+	coordinator, err := c.client.Coordinator(c.groupID)
+	if err != nil {
+		if retries <= 0 {
+			return nil, err
+		}
+
+		return c.retryNewSession(ctx, topics, handler, retries, true)
+	}
+
 	// Join consumer group
 	join, err := c.joinGroupRequest(coordinator, topics)
 	if err != nil {
@@ -195,19 +216,19 @@
 		c.memberID = join.MemberId
 	case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
 		c.memberID = ""
-		return c.newSession(ctx, coordinator, topics, handler, retries)
+		return c.newSession(ctx, topics, handler, retries)
+	case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh
+		if retries <= 0 {
+			return nil, join.Err
+		}
+
+		return c.retryNewSession(ctx, topics, handler, retries, true)
 	case ErrRebalanceInProgress: // retry after backoff
 		if retries <= 0 {
 			return nil, join.Err
 		}
 
-		select {
-		case <-c.closed:
-			return nil, ErrClosedConsumerGroup
-		case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff):
-		}
-
-		return c.newSession(ctx, coordinator, topics, handler, retries-1)
+		return c.retryNewSession(ctx, topics, handler, retries, false)
 	default:
 		return nil, join.Err
 	}
@@ -236,19 +257,19 @@
 	case ErrNoError:
 	case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
 		c.memberID = ""
-		return c.newSession(ctx, coordinator, topics, handler, retries)
+		return c.newSession(ctx, topics, handler, retries)
+	case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh
+		if retries <= 0 {
+			return nil, sync.Err
+		}
+
+		return c.retryNewSession(ctx, topics, handler, retries, true)
 	case ErrRebalanceInProgress: // retry after backoff
 		if retries <= 0 {
 			return nil, sync.Err
 		}
 
-		select {
-		case <-c.closed:
-			return nil, ErrClosedConsumerGroup
-		case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff):
-		}
-
-		return c.newSession(ctx, coordinator, topics, handler, retries-1)
+		return c.retryNewSession(ctx, topics, handler, retries, false)
 	default:
 		return nil, sync.Err
 	}
@@ -613,7 +634,7 @@
 	s.releaseOnce.Do(func() {
 		if withCleanup {
 			if e := s.handler.Cleanup(s); e != nil {
-				s.parent.handleError(err, "", -1)
+				s.parent.handleError(e, "", -1)
 				err = e
 			}
 		}