VOL-1967 move api-server to separate repository
Change-Id: I21b85be74205805be15f8a85e53a903d16785671
diff --git a/vendor/github.com/Shopify/sarama/consumer_group.go b/vendor/github.com/Shopify/sarama/consumer_group.go
index 8c8babc..8de9513 100644
--- a/vendor/github.com/Shopify/sarama/consumer_group.go
+++ b/vendor/github.com/Shopify/sarama/consumer_group.go
@@ -52,8 +52,7 @@
}
type consumerGroup struct {
- client Client
- ownClient bool
+ client Client
config *Config
consumer Consumer
@@ -73,20 +72,24 @@
return nil, err
}
- c, err := NewConsumerGroupFromClient(groupID, client)
+ c, err := newConsumerGroup(groupID, client)
if err != nil {
_ = client.Close()
- return nil, err
}
-
- c.(*consumerGroup).ownClient = true
- return c, nil
+ return c, err
}
// NewConsumerGroupFromClient creates a new consumer group using the given client. It is still
// necessary to call Close() on the underlying client when shutting down this consumer.
// PLEASE NOTE: consumer groups can only re-use but not share clients.
func NewConsumerGroupFromClient(groupID string, client Client) (ConsumerGroup, error) {
+ // For clients passed in by the client, ensure we don't
+ // call Close() on it.
+ cli := &nopCloserClient{client}
+ return newConsumerGroup(groupID, cli)
+}
+
+func newConsumerGroup(groupID string, client Client) (ConsumerGroup, error) {
config := client.Config()
if !config.Version.IsAtLeast(V0_10_2_0) {
return nil, ConfigurationError("consumer groups require Version to be >= V0_10_2_0")
@@ -131,10 +134,8 @@
err = e
}
- if c.ownClient {
- if e := c.client.Close(); e != nil {
- err = e
- }
+ if e := c.client.Close(); e != nil {
+ err = e
}
})
return
@@ -162,14 +163,8 @@
return err
}
- // Get coordinator
- coordinator, err := c.client.Coordinator(c.groupID)
- if err != nil {
- return err
- }
-
// Init session
- sess, err := c.newSession(ctx, coordinator, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)
+ sess, err := c.newSession(ctx, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)
if err == ErrClosedClient {
return ErrClosedConsumerGroup
} else if err != nil {
@@ -183,7 +178,33 @@
return sess.release(true)
}
-func (c *consumerGroup) newSession(ctx context.Context, coordinator *Broker, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {
+func (c *consumerGroup) retryNewSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int, refreshCoordinator bool) (*consumerGroupSession, error) {
+ select {
+ case <-c.closed:
+ return nil, ErrClosedConsumerGroup
+ case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff):
+ }
+
+ if refreshCoordinator {
+ err := c.client.RefreshCoordinator(c.groupID)
+ if err != nil {
+ return c.retryNewSession(ctx, topics, handler, retries, true)
+ }
+ }
+
+ return c.newSession(ctx, topics, handler, retries-1)
+}
+
+func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {
+ coordinator, err := c.client.Coordinator(c.groupID)
+ if err != nil {
+ if retries <= 0 {
+ return nil, err
+ }
+
+ return c.retryNewSession(ctx, topics, handler, retries, true)
+ }
+
// Join consumer group
join, err := c.joinGroupRequest(coordinator, topics)
if err != nil {
@@ -195,19 +216,19 @@
c.memberID = join.MemberId
case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
c.memberID = ""
- return c.newSession(ctx, coordinator, topics, handler, retries)
+ return c.newSession(ctx, topics, handler, retries)
+ case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh
+ if retries <= 0 {
+ return nil, join.Err
+ }
+
+ return c.retryNewSession(ctx, topics, handler, retries, true)
case ErrRebalanceInProgress: // retry after backoff
if retries <= 0 {
return nil, join.Err
}
- select {
- case <-c.closed:
- return nil, ErrClosedConsumerGroup
- case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff):
- }
-
- return c.newSession(ctx, coordinator, topics, handler, retries-1)
+ return c.retryNewSession(ctx, topics, handler, retries, false)
default:
return nil, join.Err
}
@@ -236,19 +257,19 @@
case ErrNoError:
case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
c.memberID = ""
- return c.newSession(ctx, coordinator, topics, handler, retries)
+ return c.newSession(ctx, topics, handler, retries)
+ case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh
+ if retries <= 0 {
+ return nil, sync.Err
+ }
+
+ return c.retryNewSession(ctx, topics, handler, retries, true)
case ErrRebalanceInProgress: // retry after backoff
if retries <= 0 {
return nil, sync.Err
}
- select {
- case <-c.closed:
- return nil, ErrClosedConsumerGroup
- case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff):
- }
-
- return c.newSession(ctx, coordinator, topics, handler, retries-1)
+ return c.retryNewSession(ctx, topics, handler, retries, false)
default:
return nil, sync.Err
}
@@ -613,7 +634,7 @@
s.releaseOnce.Do(func() {
if withCleanup {
if e := s.handler.Cleanup(s); e != nil {
- s.parent.handleError(err, "", -1)
+ s.parent.handleError(e, "", -1)
err = e
}
}