blob: e78988d7181cecdadc23522743279de6e9f526ab [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001package sarama
2
3import (
4 "math"
5 "sort"
6)
7
8// BalanceStrategyPlan is the results of any BalanceStrategy.Plan attempt.
9// It contains an allocation of topic/partitions by memberID in the form of
10// a `memberID -> topic -> partitions` map.
11type BalanceStrategyPlan map[string]map[string][]int32
12
13// Add assigns a topic with a number partitions to a member.
14func (p BalanceStrategyPlan) Add(memberID, topic string, partitions ...int32) {
15 if len(partitions) == 0 {
16 return
17 }
18 if _, ok := p[memberID]; !ok {
19 p[memberID] = make(map[string][]int32, 1)
20 }
21 p[memberID][topic] = append(p[memberID][topic], partitions...)
22}
23
24// --------------------------------------------------------------------
25
26// BalanceStrategy is used to balance topics and partitions
27// across memebers of a consumer group
28type BalanceStrategy interface {
29 // Name uniquely identifies the strategy.
30 Name() string
31
32 // Plan accepts a map of `memberID -> metadata` and a map of `topic -> partitions`
33 // and returns a distribution plan.
34 Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error)
35}
36
37// --------------------------------------------------------------------
38
39// BalanceStrategyRange is the default and assigns partitions as ranges to consumer group members.
40// Example with one topic T with six partitions (0..5) and two members (M1, M2):
41// M1: {T: [0, 1, 2]}
42// M2: {T: [3, 4, 5]}
43var BalanceStrategyRange = &balanceStrategy{
44 name: "range",
45 coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) {
46 step := float64(len(partitions)) / float64(len(memberIDs))
47
48 for i, memberID := range memberIDs {
49 pos := float64(i)
50 min := int(math.Floor(pos*step + 0.5))
51 max := int(math.Floor((pos+1)*step + 0.5))
52 plan.Add(memberID, topic, partitions[min:max]...)
53 }
54 },
55}
56
57// BalanceStrategyRoundRobin assigns partitions to members in alternating order.
58// Example with topic T with six partitions (0..5) and two members (M1, M2):
59// M1: {T: [0, 2, 4]}
60// M2: {T: [1, 3, 5]}
61var BalanceStrategyRoundRobin = &balanceStrategy{
62 name: "roundrobin",
63 coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) {
64 for i, part := range partitions {
65 memberID := memberIDs[i%len(memberIDs)]
66 plan.Add(memberID, topic, part)
67 }
68 },
69}
70
71// --------------------------------------------------------------------
72
73type balanceStrategy struct {
74 name string
75 coreFn func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32)
76}
77
78// Name implements BalanceStrategy.
79func (s *balanceStrategy) Name() string { return s.name }
80
81// Balance implements BalanceStrategy.
82func (s *balanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
83 // Build members by topic map
84 mbt := make(map[string][]string)
85 for memberID, meta := range members {
86 for _, topic := range meta.Topics {
87 mbt[topic] = append(mbt[topic], memberID)
88 }
89 }
90
91 // Sort members for each topic
92 for topic, memberIDs := range mbt {
93 sort.Sort(&balanceStrategySortable{
94 topic: topic,
95 memberIDs: memberIDs,
96 })
97 }
98
99 // Assemble plan
100 plan := make(BalanceStrategyPlan, len(members))
101 for topic, memberIDs := range mbt {
102 s.coreFn(plan, memberIDs, topic, topics[topic])
103 }
104 return plan, nil
105}
106
107type balanceStrategySortable struct {
108 topic string
109 memberIDs []string
110}
111
112func (p balanceStrategySortable) Len() int { return len(p.memberIDs) }
113func (p balanceStrategySortable) Swap(i, j int) {
114 p.memberIDs[i], p.memberIDs[j] = p.memberIDs[j], p.memberIDs[i]
115}
116func (p balanceStrategySortable) Less(i, j int) bool {
117 return balanceStrategyHashValue(p.topic, p.memberIDs[i]) < balanceStrategyHashValue(p.topic, p.memberIDs[j])
118}
119
120func balanceStrategyHashValue(vv ...string) uint32 {
121 h := uint32(2166136261)
122 for _, s := range vv {
123 for _, c := range s {
124 h ^= uint32(c)
125 h *= 16777619
126 }
127 }
128 return h
129}