William Kurkian | ea86948 | 2019-04-09 15:16:11 -0400 | [diff] [blame] | 1 | package sarama |
| 2 | |
| 3 | import ( |
| 4 | "math" |
| 5 | "sort" |
| 6 | ) |
| 7 | |
| 8 | // BalanceStrategyPlan is the results of any BalanceStrategy.Plan attempt. |
| 9 | // It contains an allocation of topic/partitions by memberID in the form of |
| 10 | // a `memberID -> topic -> partitions` map. |
| 11 | type BalanceStrategyPlan map[string]map[string][]int32 |
| 12 | |
| 13 | // Add assigns a topic with a number partitions to a member. |
| 14 | func (p BalanceStrategyPlan) Add(memberID, topic string, partitions ...int32) { |
| 15 | if len(partitions) == 0 { |
| 16 | return |
| 17 | } |
| 18 | if _, ok := p[memberID]; !ok { |
| 19 | p[memberID] = make(map[string][]int32, 1) |
| 20 | } |
| 21 | p[memberID][topic] = append(p[memberID][topic], partitions...) |
| 22 | } |
| 23 | |
| 24 | // -------------------------------------------------------------------- |
| 25 | |
| 26 | // BalanceStrategy is used to balance topics and partitions |
Abhilash S.L | 3b49463 | 2019-07-16 15:51:09 +0530 | [diff] [blame] | 27 | // across members of a consumer group |
William Kurkian | ea86948 | 2019-04-09 15:16:11 -0400 | [diff] [blame] | 28 | type BalanceStrategy interface { |
| 29 | // Name uniquely identifies the strategy. |
| 30 | Name() string |
| 31 | |
| 32 | // Plan accepts a map of `memberID -> metadata` and a map of `topic -> partitions` |
| 33 | // and returns a distribution plan. |
| 34 | Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) |
| 35 | } |
| 36 | |
| 37 | // -------------------------------------------------------------------- |
| 38 | |
| 39 | // BalanceStrategyRange is the default and assigns partitions as ranges to consumer group members. |
| 40 | // Example with one topic T with six partitions (0..5) and two members (M1, M2): |
| 41 | // M1: {T: [0, 1, 2]} |
| 42 | // M2: {T: [3, 4, 5]} |
| 43 | var BalanceStrategyRange = &balanceStrategy{ |
| 44 | name: "range", |
| 45 | coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) { |
| 46 | step := float64(len(partitions)) / float64(len(memberIDs)) |
| 47 | |
| 48 | for i, memberID := range memberIDs { |
| 49 | pos := float64(i) |
| 50 | min := int(math.Floor(pos*step + 0.5)) |
| 51 | max := int(math.Floor((pos+1)*step + 0.5)) |
| 52 | plan.Add(memberID, topic, partitions[min:max]...) |
| 53 | } |
| 54 | }, |
| 55 | } |
| 56 | |
| 57 | // BalanceStrategyRoundRobin assigns partitions to members in alternating order. |
| 58 | // Example with topic T with six partitions (0..5) and two members (M1, M2): |
| 59 | // M1: {T: [0, 2, 4]} |
| 60 | // M2: {T: [1, 3, 5]} |
| 61 | var BalanceStrategyRoundRobin = &balanceStrategy{ |
| 62 | name: "roundrobin", |
| 63 | coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) { |
| 64 | for i, part := range partitions { |
| 65 | memberID := memberIDs[i%len(memberIDs)] |
| 66 | plan.Add(memberID, topic, part) |
| 67 | } |
| 68 | }, |
| 69 | } |
| 70 | |
| 71 | // -------------------------------------------------------------------- |
| 72 | |
| 73 | type balanceStrategy struct { |
| 74 | name string |
| 75 | coreFn func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) |
| 76 | } |
| 77 | |
| 78 | // Name implements BalanceStrategy. |
| 79 | func (s *balanceStrategy) Name() string { return s.name } |
| 80 | |
Abhilash S.L | 3b49463 | 2019-07-16 15:51:09 +0530 | [diff] [blame] | 81 | // Plan implements BalanceStrategy. |
William Kurkian | ea86948 | 2019-04-09 15:16:11 -0400 | [diff] [blame] | 82 | func (s *balanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) { |
| 83 | // Build members by topic map |
| 84 | mbt := make(map[string][]string) |
| 85 | for memberID, meta := range members { |
| 86 | for _, topic := range meta.Topics { |
| 87 | mbt[topic] = append(mbt[topic], memberID) |
| 88 | } |
| 89 | } |
| 90 | |
| 91 | // Sort members for each topic |
| 92 | for topic, memberIDs := range mbt { |
| 93 | sort.Sort(&balanceStrategySortable{ |
| 94 | topic: topic, |
| 95 | memberIDs: memberIDs, |
| 96 | }) |
| 97 | } |
| 98 | |
| 99 | // Assemble plan |
| 100 | plan := make(BalanceStrategyPlan, len(members)) |
| 101 | for topic, memberIDs := range mbt { |
| 102 | s.coreFn(plan, memberIDs, topic, topics[topic]) |
| 103 | } |
| 104 | return plan, nil |
| 105 | } |
| 106 | |
| 107 | type balanceStrategySortable struct { |
| 108 | topic string |
| 109 | memberIDs []string |
| 110 | } |
| 111 | |
| 112 | func (p balanceStrategySortable) Len() int { return len(p.memberIDs) } |
| 113 | func (p balanceStrategySortable) Swap(i, j int) { |
| 114 | p.memberIDs[i], p.memberIDs[j] = p.memberIDs[j], p.memberIDs[i] |
| 115 | } |
| 116 | func (p balanceStrategySortable) Less(i, j int) bool { |
| 117 | return balanceStrategyHashValue(p.topic, p.memberIDs[i]) < balanceStrategyHashValue(p.topic, p.memberIDs[j]) |
| 118 | } |
| 119 | |
| 120 | func balanceStrategyHashValue(vv ...string) uint32 { |
| 121 | h := uint32(2166136261) |
| 122 | for _, s := range vv { |
| 123 | for _, c := range s { |
| 124 | h ^= uint32(c) |
| 125 | h *= 16777619 |
| 126 | } |
| 127 | } |
| 128 | return h |
| 129 | } |