[VOL-4291] Rw-core updates for gRPC migration
Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/vendor/github.com/Shopify/sarama/consumer_group.go b/vendor/github.com/Shopify/sarama/consumer_group.go
index 8de9513..2bf236a 100644
--- a/vendor/github.com/Shopify/sarama/consumer_group.go
+++ b/vendor/github.com/Shopify/sarama/consumer_group.go
@@ -38,6 +38,9 @@
// as quickly as possible to allow time for Cleanup() and the final offset commit. If the timeout
// is exceeded, the consumer will be removed from the group by Kafka, which will cause offset
// commit failures.
+ // This method should be called inside an infinite loop, when a
+ // server-side rebalance happens, the consumer session will need to be
+ // recreated to get the new claims.
Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error
// Errors returns a read channel of errors that occurred during the consumer life-cycle.
@@ -63,6 +66,8 @@
lock sync.Mutex
closed chan none
closeOnce sync.Once
+
+ userData []byte
}
// NewConsumerGroup creates a new consumer group the given broker addresses and configuration.
@@ -118,9 +123,6 @@
c.closeOnce.Do(func() {
close(c.closed)
- c.lock.Lock()
- defer c.lock.Unlock()
-
// leave group
if e := c.leave(); e != nil {
err = e
@@ -171,6 +173,11 @@
return err
}
+ // loop check topic partition numbers changed
+ // will trigger rebalance when any topic partitions number had changed
+ // avoid Consume function called again that will generate more than loopCheckPartitionNumbers coroutine
+ go c.loopCheckPartitionNumbers(topics, sess)
+
// Wait for session exit signal
<-sess.ctx.Done()
@@ -248,40 +255,41 @@
}
// Sync consumer group
- sync, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)
+ groupRequest, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)
if err != nil {
_ = coordinator.Close()
return nil, err
}
- switch sync.Err {
+ switch groupRequest.Err {
case ErrNoError:
case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
c.memberID = ""
return c.newSession(ctx, topics, handler, retries)
case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh
if retries <= 0 {
- return nil, sync.Err
+ return nil, groupRequest.Err
}
return c.retryNewSession(ctx, topics, handler, retries, true)
case ErrRebalanceInProgress: // retry after backoff
if retries <= 0 {
- return nil, sync.Err
+ return nil, groupRequest.Err
}
return c.retryNewSession(ctx, topics, handler, retries, false)
default:
- return nil, sync.Err
+ return nil, groupRequest.Err
}
// Retrieve and sort claims
var claims map[string][]int32
- if len(sync.MemberAssignment) > 0 {
- members, err := sync.GetMemberAssignment()
+ if len(groupRequest.MemberAssignment) > 0 {
+ members, err := groupRequest.GetMemberAssignment()
if err != nil {
return nil, err
}
claims = members.Topics
+ c.userData = members.UserData
for _, partitions := range claims {
sort.Sort(int32Slice(partitions))
@@ -303,9 +311,14 @@
req.RebalanceTimeout = int32(c.config.Consumer.Group.Rebalance.Timeout / time.Millisecond)
}
+ // use static user-data if configured, otherwise use consumer-group userdata from the last sync
+ userData := c.config.Consumer.Group.Member.UserData
+ if len(userData) == 0 {
+ userData = c.userData
+ }
meta := &ConsumerGroupMemberMetadata{
Topics: topics,
- UserData: c.config.Consumer.Group.Member.UserData,
+ UserData: userData,
}
strategy := c.config.Consumer.Group.Rebalance.Strategy
if err := req.AddGroupProtocolMetadata(strategy.Name(), meta); err != nil {
@@ -321,13 +334,17 @@
MemberId: c.memberID,
GenerationId: generationID,
}
+ strategy := c.config.Consumer.Group.Rebalance.Strategy
for memberID, topics := range plan {
- err := req.AddGroupAssignmentMember(memberID, &ConsumerGroupMemberAssignment{
- Topics: topics,
- })
+ assignment := &ConsumerGroupMemberAssignment{Topics: topics}
+ userDataBytes, err := strategy.AssignmentData(memberID, topics, generationID)
if err != nil {
return nil, err
}
+ assignment.UserData = userDataBytes
+ if err := req.AddGroupAssignmentMember(memberID, assignment); err != nil {
+ return nil, err
+ }
}
return coordinator.SyncGroup(req)
}
@@ -362,8 +379,10 @@
return strategy.Plan(members, topics)
}
-// Leaves the cluster, called by Close, protected by lock.
+// Leaves the cluster, called by Close.
func (c *consumerGroup) leave() error {
+ c.lock.Lock()
+ defer c.lock.Unlock()
if c.memberID == "" {
return nil
}
@@ -395,12 +414,6 @@
}
func (c *consumerGroup) handleError(err error, topic string, partition int32) {
- select {
- case <-c.closed:
- return
- default:
- }
-
if _, ok := err.(*ConsumerError); !ok && topic != "" && partition > -1 {
err = &ConsumerError{
Topic: topic,
@@ -409,14 +422,67 @@
}
}
- if c.config.Consumer.Return.Errors {
- select {
- case c.errors <- err:
- default:
- }
- } else {
+ if !c.config.Consumer.Return.Errors {
Logger.Println(err)
+ return
}
+
+ select {
+ case <-c.closed:
+ // consumer is closed
+ return
+ default:
+ }
+
+ select {
+ case c.errors <- err:
+ default:
+ // no error listener
+ }
+}
+
+func (c *consumerGroup) loopCheckPartitionNumbers(topics []string, session *consumerGroupSession) {
+ pause := time.NewTicker(c.config.Metadata.RefreshFrequency)
+ defer session.cancel()
+ defer pause.Stop()
+ var oldTopicToPartitionNum map[string]int
+ var err error
+ if oldTopicToPartitionNum, err = c.topicToPartitionNumbers(topics); err != nil {
+ return
+ }
+ for {
+ if newTopicToPartitionNum, err := c.topicToPartitionNumbers(topics); err != nil {
+ return
+ } else {
+ for topic, num := range oldTopicToPartitionNum {
+ if newTopicToPartitionNum[topic] != num {
+ return // trigger the end of the session on exit
+ }
+ }
+ }
+ select {
+ case <-pause.C:
+ case <-session.ctx.Done():
+ Logger.Printf("loop check partition number coroutine will exit, topics %s", topics)
+ // if session closed by other, should be exited
+ return
+ case <-c.closed:
+ return
+ }
+ }
+}
+
+func (c *consumerGroup) topicToPartitionNumbers(topics []string) (map[string]int, error) {
+ topicToPartitionNum := make(map[string]int, len(topics))
+ for _, topic := range topics {
+ if partitionNum, err := c.client.Partitions(topic); err != nil {
+ Logger.Printf("Consumer Group topic %s get partition number failed %v", topic, err)
+ return nil, err
+ } else {
+ topicToPartitionNum[topic] = len(partitionNum)
+ }
+ }
+ return topicToPartitionNum, nil
}
// --------------------------------------------------------------------
@@ -447,6 +513,11 @@
// message twice, and your processing should ideally be idempotent.
MarkOffset(topic string, partition int32, offset int64, metadata string)
+ // Commit the offset to the backend
+ //
+ // Note: calling Commit performs a blocking synchronous operation.
+ Commit()
+
// ResetOffset resets to the provided offset, alongside a metadata string that
// represents the state of the partition consumer at that point in time. Reset
// acts as a counterpart to MarkOffset, the difference being that it allows to
@@ -558,6 +629,10 @@
}
}
+func (s *consumerGroupSession) Commit() {
+ s.offsets.Commit()
+}
+
func (s *consumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string) {
if pom := s.offsets.findPOM(topic, partition); pom != nil {
pom.ResetOffset(offset, metadata)
@@ -657,6 +732,9 @@
pause := time.NewTicker(s.parent.config.Consumer.Group.Heartbeat.Interval)
defer pause.Stop()
+ retryBackoff := time.NewTimer(s.parent.config.Metadata.Retry.Backoff)
+ defer retryBackoff.Stop()
+
retries := s.parent.config.Metadata.Retry.Max
for {
coordinator, err := s.parent.client.Coordinator(s.parent.groupID)
@@ -665,11 +743,11 @@
s.parent.handleError(err, "", -1)
return
}
-
+ retryBackoff.Reset(s.parent.config.Metadata.Retry.Backoff)
select {
case <-s.hbDying:
return
- case <-time.After(s.parent.config.Metadata.Retry.Backoff):
+ case <-retryBackoff.C:
retries--
}
continue
@@ -694,7 +772,7 @@
case ErrRebalanceInProgress, ErrUnknownMemberId, ErrIllegalGeneration:
return
default:
- s.parent.handleError(err, "", -1)
+ s.parent.handleError(resp.Err, "", -1)
return
}