blob: 3aeaecef7a379482fe79d20ffd48b3c86588a608 [file] [log] [blame]
package cluster
import (
"math"
"sort"
"github.com/Shopify/sarama"
)
// NotificationType defines the type of notification
type NotificationType uint8
// String describes the notification type
func (t NotificationType) String() string {
switch t {
case RebalanceStart:
return "rebalance start"
case RebalanceOK:
return "rebalance OK"
case RebalanceError:
return "rebalance error"
}
return "unknown"
}
const (
UnknownNotification NotificationType = iota
RebalanceStart
RebalanceOK
RebalanceError
)
// Notification are state events emitted by the consumers on rebalance
type Notification struct {
// Type exposes the notification type
Type NotificationType
// Claimed contains topic/partitions that were claimed by this rebalance cycle
Claimed map[string][]int32
// Released contains topic/partitions that were released as part of this rebalance cycle
Released map[string][]int32
// Current are topic/partitions that are currently claimed to the consumer
Current map[string][]int32
}
func newNotification(current map[string][]int32) *Notification {
return &Notification{
Type: RebalanceStart,
Current: current,
}
}
func (n *Notification) success(current map[string][]int32) *Notification {
o := &Notification{
Type: RebalanceOK,
Claimed: make(map[string][]int32),
Released: make(map[string][]int32),
Current: current,
}
for topic, partitions := range current {
o.Claimed[topic] = int32Slice(partitions).Diff(int32Slice(n.Current[topic]))
}
for topic, partitions := range n.Current {
o.Released[topic] = int32Slice(partitions).Diff(int32Slice(current[topic]))
}
return o
}
// --------------------------------------------------------------------
type topicInfo struct {
Partitions []int32
MemberIDs []string
}
func (info topicInfo) Perform(s Strategy) map[string][]int32 {
if s == StrategyRoundRobin {
return info.RoundRobin()
}
return info.Ranges()
}
func (info topicInfo) Ranges() map[string][]int32 {
sort.Strings(info.MemberIDs)
mlen := len(info.MemberIDs)
plen := len(info.Partitions)
res := make(map[string][]int32, mlen)
for pos, memberID := range info.MemberIDs {
n, i := float64(plen)/float64(mlen), float64(pos)
min := int(math.Floor(i*n + 0.5))
max := int(math.Floor((i+1)*n + 0.5))
sub := info.Partitions[min:max]
if len(sub) > 0 {
res[memberID] = sub
}
}
return res
}
func (info topicInfo) RoundRobin() map[string][]int32 {
sort.Strings(info.MemberIDs)
mlen := len(info.MemberIDs)
res := make(map[string][]int32, mlen)
for i, pnum := range info.Partitions {
memberID := info.MemberIDs[i%mlen]
res[memberID] = append(res[memberID], pnum)
}
return res
}
// --------------------------------------------------------------------
type balancer struct {
client sarama.Client
topics map[string]topicInfo
}
func newBalancerFromMeta(client sarama.Client, members map[string]sarama.ConsumerGroupMemberMetadata) (*balancer, error) {
balancer := newBalancer(client)
for memberID, meta := range members {
for _, topic := range meta.Topics {
if err := balancer.Topic(topic, memberID); err != nil {
return nil, err
}
}
}
return balancer, nil
}
func newBalancer(client sarama.Client) *balancer {
return &balancer{
client: client,
topics: make(map[string]topicInfo),
}
}
func (r *balancer) Topic(name string, memberID string) error {
topic, ok := r.topics[name]
if !ok {
nums, err := r.client.Partitions(name)
if err != nil {
return err
}
topic = topicInfo{
Partitions: nums,
MemberIDs: make([]string, 0, 1),
}
}
topic.MemberIDs = append(topic.MemberIDs, memberID)
r.topics[name] = topic
return nil
}
func (r *balancer) Perform(s Strategy) map[string]map[string][]int32 {
res := make(map[string]map[string][]int32, 1)
for topic, info := range r.topics {
for memberID, partitions := range info.Perform(s) {
if _, ok := res[memberID]; !ok {
res[memberID] = make(map[string][]int32, 1)
}
res[memberID][topic] = partitions
}
}
return res
}