blob: 3aeaecef7a379482fe79d20ffd48b3c86588a608 [file] [log] [blame]
Takahiro Suzuki241c10e2020-12-17 20:17:57 +09001package cluster
2
3import (
4 "math"
5 "sort"
6
7 "github.com/Shopify/sarama"
8)
9
10// NotificationType defines the type of notification
11type NotificationType uint8
12
13// String describes the notification type
14func (t NotificationType) String() string {
15 switch t {
16 case RebalanceStart:
17 return "rebalance start"
18 case RebalanceOK:
19 return "rebalance OK"
20 case RebalanceError:
21 return "rebalance error"
22 }
23 return "unknown"
24}
25
26const (
27 UnknownNotification NotificationType = iota
28 RebalanceStart
29 RebalanceOK
30 RebalanceError
31)
32
33// Notification are state events emitted by the consumers on rebalance
34type Notification struct {
35 // Type exposes the notification type
36 Type NotificationType
37
38 // Claimed contains topic/partitions that were claimed by this rebalance cycle
39 Claimed map[string][]int32
40
41 // Released contains topic/partitions that were released as part of this rebalance cycle
42 Released map[string][]int32
43
44 // Current are topic/partitions that are currently claimed to the consumer
45 Current map[string][]int32
46}
47
48func newNotification(current map[string][]int32) *Notification {
49 return &Notification{
50 Type: RebalanceStart,
51 Current: current,
52 }
53}
54
55func (n *Notification) success(current map[string][]int32) *Notification {
56 o := &Notification{
57 Type: RebalanceOK,
58 Claimed: make(map[string][]int32),
59 Released: make(map[string][]int32),
60 Current: current,
61 }
62 for topic, partitions := range current {
63 o.Claimed[topic] = int32Slice(partitions).Diff(int32Slice(n.Current[topic]))
64 }
65 for topic, partitions := range n.Current {
66 o.Released[topic] = int32Slice(partitions).Diff(int32Slice(current[topic]))
67 }
68 return o
69}
70
71// --------------------------------------------------------------------
72
73type topicInfo struct {
74 Partitions []int32
75 MemberIDs []string
76}
77
78func (info topicInfo) Perform(s Strategy) map[string][]int32 {
79 if s == StrategyRoundRobin {
80 return info.RoundRobin()
81 }
82 return info.Ranges()
83}
84
85func (info topicInfo) Ranges() map[string][]int32 {
86 sort.Strings(info.MemberIDs)
87
88 mlen := len(info.MemberIDs)
89 plen := len(info.Partitions)
90 res := make(map[string][]int32, mlen)
91
92 for pos, memberID := range info.MemberIDs {
93 n, i := float64(plen)/float64(mlen), float64(pos)
94 min := int(math.Floor(i*n + 0.5))
95 max := int(math.Floor((i+1)*n + 0.5))
96 sub := info.Partitions[min:max]
97 if len(sub) > 0 {
98 res[memberID] = sub
99 }
100 }
101 return res
102}
103
104func (info topicInfo) RoundRobin() map[string][]int32 {
105 sort.Strings(info.MemberIDs)
106
107 mlen := len(info.MemberIDs)
108 res := make(map[string][]int32, mlen)
109 for i, pnum := range info.Partitions {
110 memberID := info.MemberIDs[i%mlen]
111 res[memberID] = append(res[memberID], pnum)
112 }
113 return res
114}
115
116// --------------------------------------------------------------------
117
118type balancer struct {
119 client sarama.Client
120 topics map[string]topicInfo
121}
122
123func newBalancerFromMeta(client sarama.Client, members map[string]sarama.ConsumerGroupMemberMetadata) (*balancer, error) {
124 balancer := newBalancer(client)
125 for memberID, meta := range members {
126 for _, topic := range meta.Topics {
127 if err := balancer.Topic(topic, memberID); err != nil {
128 return nil, err
129 }
130 }
131 }
132 return balancer, nil
133}
134
135func newBalancer(client sarama.Client) *balancer {
136 return &balancer{
137 client: client,
138 topics: make(map[string]topicInfo),
139 }
140}
141
142func (r *balancer) Topic(name string, memberID string) error {
143 topic, ok := r.topics[name]
144 if !ok {
145 nums, err := r.client.Partitions(name)
146 if err != nil {
147 return err
148 }
149 topic = topicInfo{
150 Partitions: nums,
151 MemberIDs: make([]string, 0, 1),
152 }
153 }
154 topic.MemberIDs = append(topic.MemberIDs, memberID)
155 r.topics[name] = topic
156 return nil
157}
158
159func (r *balancer) Perform(s Strategy) map[string]map[string][]int32 {
160 res := make(map[string]map[string][]int32, 1)
161 for topic, info := range r.topics {
162 for memberID, partitions := range info.Perform(s) {
163 if _, ok := res[memberID]; !ok {
164 res[memberID] = make(map[string][]int32, 1)
165 }
166 res[memberID][topic] = partitions
167 }
168 }
169 return res
170}