gRPC migration update
Change-Id: Icdd1a824948fa994cd36bd121c962f5ecf74e3cf
diff --git a/vendor/github.com/Shopify/sarama/balance_strategy.go b/vendor/github.com/Shopify/sarama/balance_strategy.go
index 67c4d96..9855bf4 100644
--- a/vendor/github.com/Shopify/sarama/balance_strategy.go
+++ b/vendor/github.com/Shopify/sarama/balance_strategy.go
@@ -2,6 +2,8 @@
import (
"container/heap"
+ "errors"
+ "fmt"
"math"
"sort"
"strings"
@@ -47,6 +49,10 @@
// Plan accepts a map of `memberID -> metadata` and a map of `topic -> partitions`
// and returns a distribution plan.
Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error)
+
+ // AssignmentData returns the serialized assignment data for the specified
+ // memberID
+ AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error)
}
// --------------------------------------------------------------------
@@ -69,20 +75,6 @@
},
}
-// BalanceStrategyRoundRobin assigns partitions to members in alternating order.
-// Example with topic T with six partitions (0..5) and two members (M1, M2):
-// M1: {T: [0, 2, 4]}
-// M2: {T: [1, 3, 5]}
-var BalanceStrategyRoundRobin = &balanceStrategy{
- name: RoundRobinBalanceStrategyName,
- coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) {
- for i, part := range partitions {
- memberID := memberIDs[i%len(memberIDs)]
- plan.Add(memberID, topic, part)
- }
- },
-}
-
// BalanceStrategySticky assigns partitions to members with an attempt to preserve earlier assignments
// while maintain a balanced partition distribution.
// Example with topic T with six partitions (0..5) and two members (M1, M2):
@@ -132,6 +124,11 @@
return plan, nil
}
+// AssignmentData simple strategies do not require any shared assignment data
+func (s *balanceStrategy) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
+ return nil, nil
+}
+
type balanceStrategySortable struct {
topic string
memberIDs []string
@@ -141,6 +138,7 @@
func (p balanceStrategySortable) Swap(i, j int) {
p.memberIDs[i], p.memberIDs[j] = p.memberIDs[j], p.memberIDs[i]
}
+
func (p balanceStrategySortable) Less(i, j int) bool {
return balanceStrategyHashValue(p.topic, p.memberIDs[i]) < balanceStrategyHashValue(p.topic, p.memberIDs[j])
}
@@ -258,7 +256,7 @@
plan := make(BalanceStrategyPlan, len(currentAssignment))
for memberID, assignments := range currentAssignment {
if len(assignments) == 0 {
- plan[memberID] = make(map[string][]int32, 0)
+ plan[memberID] = make(map[string][]int32)
} else {
for _, assignment := range assignments {
plan.Add(memberID, assignment.Topic, assignment.Partition)
@@ -268,6 +266,15 @@
return plan, nil
}
+// AssignmentData serializes the set of topics currently assigned to the
+// specified member as part of the supplied balance plan
+func (s *stickyBalanceStrategy) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
+ return encode(&StickyAssignorUserDataV1{
+ Topics: topics,
+ Generation: generationID,
+ }, nil)
+}
+
func strsContains(s []string, value string) bool {
for _, entry := range s {
if entry == value {
@@ -335,6 +342,92 @@
}
}
+// BalanceStrategyRoundRobin assigns partitions to members in alternating order.
+// For example, there are two topics (t0, t1) and two consumer (m0, m1), and each topic has three partitions (p0, p1, p2):
+// M0: [t0p0, t0p2, t1p1]
+// M1: [t0p1, t1p0, t1p2]
+var BalanceStrategyRoundRobin = new(roundRobinBalancer)
+
+type roundRobinBalancer struct{}
+
+func (b *roundRobinBalancer) Name() string {
+ return RoundRobinBalanceStrategyName
+}
+
+func (b *roundRobinBalancer) Plan(memberAndMetadata map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
+ if len(memberAndMetadata) == 0 || len(topics) == 0 {
+ return nil, errors.New("members and topics are not provided")
+ }
+ // sort partitions
+ var topicPartitions []topicAndPartition
+ for topic, partitions := range topics {
+ for _, partition := range partitions {
+ topicPartitions = append(topicPartitions, topicAndPartition{topic: topic, partition: partition})
+ }
+ }
+ sort.SliceStable(topicPartitions, func(i, j int) bool {
+ pi := topicPartitions[i]
+ pj := topicPartitions[j]
+ return pi.comparedValue() < pj.comparedValue()
+ })
+
+ // sort members
+ var members []memberAndTopic
+ for memberID, meta := range memberAndMetadata {
+ m := memberAndTopic{
+ memberID: memberID,
+ topics: make(map[string]struct{}),
+ }
+ for _, t := range meta.Topics {
+ m.topics[t] = struct{}{}
+ }
+ members = append(members, m)
+ }
+ sort.SliceStable(members, func(i, j int) bool {
+ mi := members[i]
+ mj := members[j]
+ return mi.memberID < mj.memberID
+ })
+
+ // assign partitions
+ plan := make(BalanceStrategyPlan, len(members))
+ i := 0
+ n := len(members)
+ for _, tp := range topicPartitions {
+ m := members[i%n]
+ for !m.hasTopic(tp.topic) {
+ i++
+ m = members[i%n]
+ }
+ plan.Add(m.memberID, tp.topic, tp.partition)
+ i++
+ }
+ return plan, nil
+}
+
+func (b *roundRobinBalancer) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
+ return nil, nil // do nothing for now
+}
+
+type topicAndPartition struct {
+ topic string
+ partition int32
+}
+
+func (tp *topicAndPartition) comparedValue() string {
+ return fmt.Sprintf("%s-%d", tp.topic, tp.partition)
+}
+
+type memberAndTopic struct {
+ memberID string
+ topics map[string]struct{}
+}
+
+func (m *memberAndTopic) hasTopic(topic string) bool {
+ _, isExist := m.topics[topic]
+ return isExist
+}
+
// Calculate the balance score of the given assignment, as the sum of assigned partitions size difference of all consumer pairs.
// A perfectly balanced assignment (with all consumers getting the same number of partitions) has a balance score of 0.
// Lower balance score indicates a more balanced assignment.
@@ -355,8 +448,8 @@
}
// Determine whether the current assignment plan is balanced.
-func isBalanced(currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, allSubscriptions map[string][]topicPartitionAssignment) bool {
- sortedCurrentSubscriptions = sortMemberIDsByPartitionAssignments(currentAssignment)
+func isBalanced(currentAssignment map[string][]topicPartitionAssignment, allSubscriptions map[string][]topicPartitionAssignment) bool {
+ sortedCurrentSubscriptions := sortMemberIDsByPartitionAssignments(currentAssignment)
min := len(currentAssignment[sortedCurrentSubscriptions[0]])
max := len(currentAssignment[sortedCurrentSubscriptions[len(sortedCurrentSubscriptions)-1]])
if min >= max-1 {
@@ -412,7 +505,7 @@
// reassign all reassignable partitions (starting from the partition with least potential consumers and if needed)
// until the full list is processed or a balance is achieved
for _, partition := range reassignablePartitions {
- if isBalanced(currentAssignment, sortedCurrentSubscriptions, consumer2AllPotentialPartitions) {
+ if isBalanced(currentAssignment, consumer2AllPotentialPartitions) {
break
}
@@ -671,20 +764,12 @@
return sortedPartionIDs
}
-func deepCopyPartitions(src []topicPartitionAssignment) []topicPartitionAssignment {
- dst := make([]topicPartitionAssignment, len(src))
- for i, partition := range src {
- dst[i] = partition
- }
- return dst
-}
-
func deepCopyAssignment(assignment map[string][]topicPartitionAssignment) map[string][]topicPartitionAssignment {
- copy := make(map[string][]topicPartitionAssignment, len(assignment))
+ m := make(map[string][]topicPartitionAssignment, len(assignment))
for memberID, subscriptions := range assignment {
- copy[memberID] = append(subscriptions[:0:0], subscriptions...)
+ m[memberID] = append(subscriptions[:0:0], subscriptions...)
}
- return copy
+ return m
}
func areSubscriptionsIdentical(partition2AllPotentialConsumers map[topicPartitionAssignment][]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) bool {
@@ -938,9 +1023,7 @@
for i := 0; i < len(cycle)-1; i++ {
superCycle[i] = cycle[i]
}
- for _, c := range cycle {
- superCycle = append(superCycle, c)
- }
+ superCycle = append(superCycle, cycle...)
for _, foundCycle := range cycles {
if len(foundCycle) == len(cycle) && indexOfSubList(superCycle, foundCycle) != -1 {
return true