SEBA-902 single-olt tests;
Pin protoc-gen-go to 1.3.2 to resolve compatibility issue;
Run go mod tidy / go mod vendor on importer;
Add Go Module support to demotest
Change-Id: Ifde824fc9a6317b0adc1e12bea54ee1f9b788906
diff --git a/vendor/github.com/Shopify/sarama/consumer_group.go b/vendor/github.com/Shopify/sarama/consumer_group.go
index b974dd9..951f64b 100644
--- a/vendor/github.com/Shopify/sarama/consumer_group.go
+++ b/vendor/github.com/Shopify/sarama/consumer_group.go
@@ -120,9 +120,6 @@
c.closeOnce.Do(func() {
close(c.closed)
- c.lock.Lock()
- defer c.lock.Unlock()
-
// leave group
if e := c.leave(); e != nil {
err = e
@@ -175,6 +172,7 @@
// loop check topic partition numbers changed
// will trigger rebalance when any topic partitions number had changed
+ // avoid Consume function called again that will generate more than loopCheckPartitionNumbers coroutine
go c.loopCheckPartitionNumbers(topics, sess)
// Wait for session exit signal
@@ -333,20 +331,14 @@
MemberId: c.memberID,
GenerationId: generationID,
}
+ strategy := c.config.Consumer.Group.Rebalance.Strategy
for memberID, topics := range plan {
assignment := &ConsumerGroupMemberAssignment{Topics: topics}
-
- // Include topic assignments in group-assignment userdata for each consumer-group member
- if c.config.Consumer.Group.Rebalance.Strategy.Name() == StickyBalanceStrategyName {
- userDataBytes, err := encode(&StickyAssignorUserDataV1{
- Topics: topics,
- Generation: generationID,
- }, nil)
- if err != nil {
- return nil, err
- }
- assignment.UserData = userDataBytes
+ userDataBytes, err := strategy.AssignmentData(memberID, topics, generationID)
+ if err != nil {
+ return nil, err
}
+ assignment.UserData = userDataBytes
if err := req.AddGroupAssignmentMember(memberID, assignment); err != nil {
return nil, err
}
@@ -384,8 +376,10 @@
return strategy.Plan(members, topics)
}
-// Leaves the cluster, called by Close, protected by lock.
+// Leaves the cluster, called by Close.
func (c *consumerGroup) leave() error {
+ c.lock.Lock()
+ defer c.lock.Unlock()
if c.memberID == "" {
return nil
}
@@ -430,9 +424,6 @@
return
}
- c.lock.Lock()
- defer c.lock.Unlock()
-
select {
case <-c.closed:
//consumer is closed
@@ -448,7 +439,7 @@
}
func (c *consumerGroup) loopCheckPartitionNumbers(topics []string, session *consumerGroupSession) {
- pause := time.NewTicker(c.config.Consumer.Group.Heartbeat.Interval * 2)
+ pause := time.NewTicker(c.config.Metadata.RefreshFrequency)
defer session.cancel()
defer pause.Stop()
var oldTopicToPartitionNum map[string]int
@@ -468,6 +459,10 @@
}
select {
case <-pause.C:
+ case <-session.ctx.Done():
+ Logger.Printf("loop check partition number coroutine will exit, topics %s", topics)
+ // if session closed by other, should be exited
+ return
case <-c.closed:
return
}
@@ -475,10 +470,6 @@
}
func (c *consumerGroup) topicToPartitionNumbers(topics []string) (map[string]int, error) {
- if err := c.client.RefreshMetadata(topics...); err != nil {
- Logger.Printf("Consumer Group refresh metadata failed %v", err)
- return nil, err
- }
topicToPartitionNum := make(map[string]int, len(topics))
for _, topic := range topics {
if partitionNum, err := c.client.Partitions(topic); err != nil {