blob: 9855bf44398d74974b19ade6bfc53177df39cb16 [file] [log] [blame]
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001package sarama
2
3import (
khenaidoo7d3c5582021-08-11 18:09:44 -04004 "container/heap"
5 "errors"
6 "fmt"
Holger Hildebrandtfa074992020-03-27 15:42:06 +00007 "math"
8 "sort"
khenaidoo7d3c5582021-08-11 18:09:44 -04009 "strings"
10)
11
12const (
13 // RangeBalanceStrategyName identifies strategies that use the range partition assignment strategy
14 RangeBalanceStrategyName = "range"
15
16 // RoundRobinBalanceStrategyName identifies strategies that use the round-robin partition assignment strategy
17 RoundRobinBalanceStrategyName = "roundrobin"
18
19 // StickyBalanceStrategyName identifies strategies that use the sticky-partition assignment strategy
20 StickyBalanceStrategyName = "sticky"
21
22 defaultGeneration = -1
Holger Hildebrandtfa074992020-03-27 15:42:06 +000023)
24
25// BalanceStrategyPlan is the results of any BalanceStrategy.Plan attempt.
26// It contains an allocation of topic/partitions by memberID in the form of
27// a `memberID -> topic -> partitions` map.
28type BalanceStrategyPlan map[string]map[string][]int32
29
30// Add assigns a topic with a number partitions to a member.
31func (p BalanceStrategyPlan) Add(memberID, topic string, partitions ...int32) {
32 if len(partitions) == 0 {
33 return
34 }
35 if _, ok := p[memberID]; !ok {
36 p[memberID] = make(map[string][]int32, 1)
37 }
38 p[memberID][topic] = append(p[memberID][topic], partitions...)
39}
40
41// --------------------------------------------------------------------
42
43// BalanceStrategy is used to balance topics and partitions
44// across members of a consumer group
45type BalanceStrategy interface {
46 // Name uniquely identifies the strategy.
47 Name() string
48
49 // Plan accepts a map of `memberID -> metadata` and a map of `topic -> partitions`
50 // and returns a distribution plan.
51 Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error)
khenaidoo7d3c5582021-08-11 18:09:44 -040052
53 // AssignmentData returns the serialized assignment data for the specified
54 // memberID
55 AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error)
Holger Hildebrandtfa074992020-03-27 15:42:06 +000056}
57
58// --------------------------------------------------------------------
59
60// BalanceStrategyRange is the default and assigns partitions as ranges to consumer group members.
61// Example with one topic T with six partitions (0..5) and two members (M1, M2):
62// M1: {T: [0, 1, 2]}
63// M2: {T: [3, 4, 5]}
64var BalanceStrategyRange = &balanceStrategy{
khenaidoo7d3c5582021-08-11 18:09:44 -040065 name: RangeBalanceStrategyName,
Holger Hildebrandtfa074992020-03-27 15:42:06 +000066 coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) {
67 step := float64(len(partitions)) / float64(len(memberIDs))
68
69 for i, memberID := range memberIDs {
70 pos := float64(i)
71 min := int(math.Floor(pos*step + 0.5))
72 max := int(math.Floor((pos+1)*step + 0.5))
73 plan.Add(memberID, topic, partitions[min:max]...)
74 }
75 },
76}
77
khenaidoo7d3c5582021-08-11 18:09:44 -040078// BalanceStrategySticky assigns partitions to members with an attempt to preserve earlier assignments
79// while maintain a balanced partition distribution.
Holger Hildebrandtfa074992020-03-27 15:42:06 +000080// Example with topic T with six partitions (0..5) and two members (M1, M2):
81// M1: {T: [0, 2, 4]}
82// M2: {T: [1, 3, 5]}
khenaidoo7d3c5582021-08-11 18:09:44 -040083//
84// On reassignment with an additional consumer, you might get an assignment plan like:
85// M1: {T: [0, 2]}
86// M2: {T: [1, 3]}
87// M3: {T: [4, 5]}
88//
89var BalanceStrategySticky = &stickyBalanceStrategy{}
Holger Hildebrandtfa074992020-03-27 15:42:06 +000090
91// --------------------------------------------------------------------
92
93type balanceStrategy struct {
94 name string
95 coreFn func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32)
96}
97
98// Name implements BalanceStrategy.
99func (s *balanceStrategy) Name() string { return s.name }
100
101// Plan implements BalanceStrategy.
102func (s *balanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
103 // Build members by topic map
104 mbt := make(map[string][]string)
105 for memberID, meta := range members {
106 for _, topic := range meta.Topics {
107 mbt[topic] = append(mbt[topic], memberID)
108 }
109 }
110
111 // Sort members for each topic
112 for topic, memberIDs := range mbt {
113 sort.Sort(&balanceStrategySortable{
114 topic: topic,
115 memberIDs: memberIDs,
116 })
117 }
118
119 // Assemble plan
120 plan := make(BalanceStrategyPlan, len(members))
121 for topic, memberIDs := range mbt {
122 s.coreFn(plan, memberIDs, topic, topics[topic])
123 }
124 return plan, nil
125}
126
khenaidoo7d3c5582021-08-11 18:09:44 -0400127// AssignmentData simple strategies do not require any shared assignment data
128func (s *balanceStrategy) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
129 return nil, nil
130}
131
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000132type balanceStrategySortable struct {
133 topic string
134 memberIDs []string
135}
136
137func (p balanceStrategySortable) Len() int { return len(p.memberIDs) }
138func (p balanceStrategySortable) Swap(i, j int) {
139 p.memberIDs[i], p.memberIDs[j] = p.memberIDs[j], p.memberIDs[i]
140}
khenaidoo7d3c5582021-08-11 18:09:44 -0400141
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000142func (p balanceStrategySortable) Less(i, j int) bool {
143 return balanceStrategyHashValue(p.topic, p.memberIDs[i]) < balanceStrategyHashValue(p.topic, p.memberIDs[j])
144}
145
146func balanceStrategyHashValue(vv ...string) uint32 {
147 h := uint32(2166136261)
148 for _, s := range vv {
149 for _, c := range s {
150 h ^= uint32(c)
151 h *= 16777619
152 }
153 }
154 return h
155}
khenaidoo7d3c5582021-08-11 18:09:44 -0400156
157type stickyBalanceStrategy struct {
158 movements partitionMovements
159}
160
161// Name implements BalanceStrategy.
162func (s *stickyBalanceStrategy) Name() string { return StickyBalanceStrategyName }
163
164// Plan implements BalanceStrategy.
165func (s *stickyBalanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
166 // track partition movements during generation of the partition assignment plan
167 s.movements = partitionMovements{
168 Movements: make(map[topicPartitionAssignment]consumerPair),
169 PartitionMovementsByTopic: make(map[string]map[consumerPair]map[topicPartitionAssignment]bool),
170 }
171
172 // prepopulate the current assignment state from userdata on the consumer group members
173 currentAssignment, prevAssignment, err := prepopulateCurrentAssignments(members)
174 if err != nil {
175 return nil, err
176 }
177
178 // determine if we're dealing with a completely fresh assignment, or if there's existing assignment state
179 isFreshAssignment := false
180 if len(currentAssignment) == 0 {
181 isFreshAssignment = true
182 }
183
184 // create a mapping of all current topic partitions and the consumers that can be assigned to them
185 partition2AllPotentialConsumers := make(map[topicPartitionAssignment][]string)
186 for topic, partitions := range topics {
187 for _, partition := range partitions {
188 partition2AllPotentialConsumers[topicPartitionAssignment{Topic: topic, Partition: partition}] = []string{}
189 }
190 }
191
192 // create a mapping of all consumers to all potential topic partitions that can be assigned to them
193 // also, populate the mapping of partitions to potential consumers
194 consumer2AllPotentialPartitions := make(map[string][]topicPartitionAssignment, len(members))
195 for memberID, meta := range members {
196 consumer2AllPotentialPartitions[memberID] = make([]topicPartitionAssignment, 0)
197 for _, topicSubscription := range meta.Topics {
198 // only evaluate topic subscriptions that are present in the supplied topics map
199 if _, found := topics[topicSubscription]; found {
200 for _, partition := range topics[topicSubscription] {
201 topicPartition := topicPartitionAssignment{Topic: topicSubscription, Partition: partition}
202 consumer2AllPotentialPartitions[memberID] = append(consumer2AllPotentialPartitions[memberID], topicPartition)
203 partition2AllPotentialConsumers[topicPartition] = append(partition2AllPotentialConsumers[topicPartition], memberID)
204 }
205 }
206 }
207
208 // add this consumer to currentAssignment (with an empty topic partition assignment) if it does not already exist
209 if _, exists := currentAssignment[memberID]; !exists {
210 currentAssignment[memberID] = make([]topicPartitionAssignment, 0)
211 }
212 }
213
214 // create a mapping of each partition to its current consumer, where possible
215 currentPartitionConsumers := make(map[topicPartitionAssignment]string, len(currentAssignment))
216 unvisitedPartitions := make(map[topicPartitionAssignment]bool, len(partition2AllPotentialConsumers))
217 for partition := range partition2AllPotentialConsumers {
218 unvisitedPartitions[partition] = true
219 }
220 var unassignedPartitions []topicPartitionAssignment
221 for memberID, partitions := range currentAssignment {
222 var keepPartitions []topicPartitionAssignment
223 for _, partition := range partitions {
224 // If this partition no longer exists at all, likely due to the
225 // topic being deleted, we remove the partition from the member.
226 if _, exists := partition2AllPotentialConsumers[partition]; !exists {
227 continue
228 }
229 delete(unvisitedPartitions, partition)
230 currentPartitionConsumers[partition] = memberID
231
232 if !strsContains(members[memberID].Topics, partition.Topic) {
233 unassignedPartitions = append(unassignedPartitions, partition)
234 continue
235 }
236 keepPartitions = append(keepPartitions, partition)
237 }
238 currentAssignment[memberID] = keepPartitions
239 }
240 for unvisited := range unvisitedPartitions {
241 unassignedPartitions = append(unassignedPartitions, unvisited)
242 }
243
244 // sort the topic partitions in order of priority for reassignment
245 sortedPartitions := sortPartitions(currentAssignment, prevAssignment, isFreshAssignment, partition2AllPotentialConsumers, consumer2AllPotentialPartitions)
246
247 // at this point we have preserved all valid topic partition to consumer assignments and removed
248 // all invalid topic partitions and invalid consumers. Now we need to assign unassignedPartitions
249 // to consumers so that the topic partition assignments are as balanced as possible.
250
251 // an ascending sorted set of consumers based on how many topic partitions are already assigned to them
252 sortedCurrentSubscriptions := sortMemberIDsByPartitionAssignments(currentAssignment)
253 s.balance(currentAssignment, prevAssignment, sortedPartitions, unassignedPartitions, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumers)
254
255 // Assemble plan
256 plan := make(BalanceStrategyPlan, len(currentAssignment))
257 for memberID, assignments := range currentAssignment {
258 if len(assignments) == 0 {
259 plan[memberID] = make(map[string][]int32)
260 } else {
261 for _, assignment := range assignments {
262 plan.Add(memberID, assignment.Topic, assignment.Partition)
263 }
264 }
265 }
266 return plan, nil
267}
268
269// AssignmentData serializes the set of topics currently assigned to the
270// specified member as part of the supplied balance plan
271func (s *stickyBalanceStrategy) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
272 return encode(&StickyAssignorUserDataV1{
273 Topics: topics,
274 Generation: generationID,
275 }, nil)
276}
277
278func strsContains(s []string, value string) bool {
279 for _, entry := range s {
280 if entry == value {
281 return true
282 }
283 }
284 return false
285}
286
287// Balance assignments across consumers for maximum fairness and stickiness.
288func (s *stickyBalanceStrategy) balance(currentAssignment map[string][]topicPartitionAssignment, prevAssignment map[topicPartitionAssignment]consumerGenerationPair, sortedPartitions []topicPartitionAssignment, unassignedPartitions []topicPartitionAssignment, sortedCurrentSubscriptions []string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, currentPartitionConsumer map[topicPartitionAssignment]string) {
289 initializing := false
290 if len(sortedCurrentSubscriptions) == 0 || len(currentAssignment[sortedCurrentSubscriptions[0]]) == 0 {
291 initializing = true
292 }
293
294 // assign all unassigned partitions
295 for _, partition := range unassignedPartitions {
296 // skip if there is no potential consumer for the partition
297 if len(partition2AllPotentialConsumers[partition]) == 0 {
298 continue
299 }
300 sortedCurrentSubscriptions = assignPartition(partition, sortedCurrentSubscriptions, currentAssignment, consumer2AllPotentialPartitions, currentPartitionConsumer)
301 }
302
303 // narrow down the reassignment scope to only those partitions that can actually be reassigned
304 for partition := range partition2AllPotentialConsumers {
305 if !canTopicPartitionParticipateInReassignment(partition, partition2AllPotentialConsumers) {
306 sortedPartitions = removeTopicPartitionFromMemberAssignments(sortedPartitions, partition)
307 }
308 }
309
310 // narrow down the reassignment scope to only those consumers that are subject to reassignment
311 fixedAssignments := make(map[string][]topicPartitionAssignment)
312 for memberID := range consumer2AllPotentialPartitions {
313 if !canConsumerParticipateInReassignment(memberID, currentAssignment, consumer2AllPotentialPartitions, partition2AllPotentialConsumers) {
314 fixedAssignments[memberID] = currentAssignment[memberID]
315 delete(currentAssignment, memberID)
316 sortedCurrentSubscriptions = sortMemberIDsByPartitionAssignments(currentAssignment)
317 }
318 }
319
320 // create a deep copy of the current assignment so we can revert to it if we do not get a more balanced assignment later
321 preBalanceAssignment := deepCopyAssignment(currentAssignment)
322 preBalancePartitionConsumers := make(map[topicPartitionAssignment]string, len(currentPartitionConsumer))
323 for k, v := range currentPartitionConsumer {
324 preBalancePartitionConsumers[k] = v
325 }
326
327 reassignmentPerformed := s.performReassignments(sortedPartitions, currentAssignment, prevAssignment, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer)
328
329 // if we are not preserving existing assignments and we have made changes to the current assignment
330 // make sure we are getting a more balanced assignment; otherwise, revert to previous assignment
331 if !initializing && reassignmentPerformed && getBalanceScore(currentAssignment) >= getBalanceScore(preBalanceAssignment) {
332 currentAssignment = deepCopyAssignment(preBalanceAssignment)
333 currentPartitionConsumer = make(map[topicPartitionAssignment]string, len(preBalancePartitionConsumers))
334 for k, v := range preBalancePartitionConsumers {
335 currentPartitionConsumer[k] = v
336 }
337 }
338
339 // add the fixed assignments (those that could not change) back
340 for consumer, assignments := range fixedAssignments {
341 currentAssignment[consumer] = assignments
342 }
343}
344
345// BalanceStrategyRoundRobin assigns partitions to members in alternating order.
346// For example, there are two topics (t0, t1) and two consumer (m0, m1), and each topic has three partitions (p0, p1, p2):
347// M0: [t0p0, t0p2, t1p1]
348// M1: [t0p1, t1p0, t1p2]
349var BalanceStrategyRoundRobin = new(roundRobinBalancer)
350
351type roundRobinBalancer struct{}
352
353func (b *roundRobinBalancer) Name() string {
354 return RoundRobinBalanceStrategyName
355}
356
357func (b *roundRobinBalancer) Plan(memberAndMetadata map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
358 if len(memberAndMetadata) == 0 || len(topics) == 0 {
359 return nil, errors.New("members and topics are not provided")
360 }
361 // sort partitions
362 var topicPartitions []topicAndPartition
363 for topic, partitions := range topics {
364 for _, partition := range partitions {
365 topicPartitions = append(topicPartitions, topicAndPartition{topic: topic, partition: partition})
366 }
367 }
368 sort.SliceStable(topicPartitions, func(i, j int) bool {
369 pi := topicPartitions[i]
370 pj := topicPartitions[j]
371 return pi.comparedValue() < pj.comparedValue()
372 })
373
374 // sort members
375 var members []memberAndTopic
376 for memberID, meta := range memberAndMetadata {
377 m := memberAndTopic{
378 memberID: memberID,
379 topics: make(map[string]struct{}),
380 }
381 for _, t := range meta.Topics {
382 m.topics[t] = struct{}{}
383 }
384 members = append(members, m)
385 }
386 sort.SliceStable(members, func(i, j int) bool {
387 mi := members[i]
388 mj := members[j]
389 return mi.memberID < mj.memberID
390 })
391
392 // assign partitions
393 plan := make(BalanceStrategyPlan, len(members))
394 i := 0
395 n := len(members)
396 for _, tp := range topicPartitions {
397 m := members[i%n]
398 for !m.hasTopic(tp.topic) {
399 i++
400 m = members[i%n]
401 }
402 plan.Add(m.memberID, tp.topic, tp.partition)
403 i++
404 }
405 return plan, nil
406}
407
408func (b *roundRobinBalancer) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
409 return nil, nil // do nothing for now
410}
411
412type topicAndPartition struct {
413 topic string
414 partition int32
415}
416
417func (tp *topicAndPartition) comparedValue() string {
418 return fmt.Sprintf("%s-%d", tp.topic, tp.partition)
419}
420
421type memberAndTopic struct {
422 memberID string
423 topics map[string]struct{}
424}
425
426func (m *memberAndTopic) hasTopic(topic string) bool {
427 _, isExist := m.topics[topic]
428 return isExist
429}
430
431// Calculate the balance score of the given assignment, as the sum of assigned partitions size difference of all consumer pairs.
432// A perfectly balanced assignment (with all consumers getting the same number of partitions) has a balance score of 0.
433// Lower balance score indicates a more balanced assignment.
434func getBalanceScore(assignment map[string][]topicPartitionAssignment) int {
435 consumer2AssignmentSize := make(map[string]int, len(assignment))
436 for memberID, partitions := range assignment {
437 consumer2AssignmentSize[memberID] = len(partitions)
438 }
439
440 var score float64
441 for memberID, consumerAssignmentSize := range consumer2AssignmentSize {
442 delete(consumer2AssignmentSize, memberID)
443 for _, otherConsumerAssignmentSize := range consumer2AssignmentSize {
444 score += math.Abs(float64(consumerAssignmentSize - otherConsumerAssignmentSize))
445 }
446 }
447 return int(score)
448}
449
450// Determine whether the current assignment plan is balanced.
451func isBalanced(currentAssignment map[string][]topicPartitionAssignment, allSubscriptions map[string][]topicPartitionAssignment) bool {
452 sortedCurrentSubscriptions := sortMemberIDsByPartitionAssignments(currentAssignment)
453 min := len(currentAssignment[sortedCurrentSubscriptions[0]])
454 max := len(currentAssignment[sortedCurrentSubscriptions[len(sortedCurrentSubscriptions)-1]])
455 if min >= max-1 {
456 // if minimum and maximum numbers of partitions assigned to consumers differ by at most one return true
457 return true
458 }
459
460 // create a mapping from partitions to the consumer assigned to them
461 allPartitions := make(map[topicPartitionAssignment]string)
462 for memberID, partitions := range currentAssignment {
463 for _, partition := range partitions {
464 if _, exists := allPartitions[partition]; exists {
465 Logger.Printf("Topic %s Partition %d is assigned more than one consumer", partition.Topic, partition.Partition)
466 }
467 allPartitions[partition] = memberID
468 }
469 }
470
471 // for each consumer that does not have all the topic partitions it can get make sure none of the topic partitions it
472 // could but did not get cannot be moved to it (because that would break the balance)
473 for _, memberID := range sortedCurrentSubscriptions {
474 consumerPartitions := currentAssignment[memberID]
475 consumerPartitionCount := len(consumerPartitions)
476
477 // skip if this consumer already has all the topic partitions it can get
478 if consumerPartitionCount == len(allSubscriptions[memberID]) {
479 continue
480 }
481
482 // otherwise make sure it cannot get any more
483 potentialTopicPartitions := allSubscriptions[memberID]
484 for _, partition := range potentialTopicPartitions {
485 if !memberAssignmentsIncludeTopicPartition(currentAssignment[memberID], partition) {
486 otherConsumer := allPartitions[partition]
487 otherConsumerPartitionCount := len(currentAssignment[otherConsumer])
488 if consumerPartitionCount < otherConsumerPartitionCount {
489 return false
490 }
491 }
492 }
493 }
494 return true
495}
496
497// Reassign all topic partitions that need reassignment until balanced.
498func (s *stickyBalanceStrategy) performReassignments(reassignablePartitions []topicPartitionAssignment, currentAssignment map[string][]topicPartitionAssignment, prevAssignment map[topicPartitionAssignment]consumerGenerationPair, sortedCurrentSubscriptions []string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, currentPartitionConsumer map[topicPartitionAssignment]string) bool {
499 reassignmentPerformed := false
500 modified := false
501
502 // repeat reassignment until no partition can be moved to improve the balance
503 for {
504 modified = false
505 // reassign all reassignable partitions (starting from the partition with least potential consumers and if needed)
506 // until the full list is processed or a balance is achieved
507 for _, partition := range reassignablePartitions {
508 if isBalanced(currentAssignment, consumer2AllPotentialPartitions) {
509 break
510 }
511
512 // the partition must have at least two consumers
513 if len(partition2AllPotentialConsumers[partition]) <= 1 {
514 Logger.Printf("Expected more than one potential consumer for partition %s topic %d", partition.Topic, partition.Partition)
515 }
516
517 // the partition must have a consumer
518 consumer := currentPartitionConsumer[partition]
519 if consumer == "" {
520 Logger.Printf("Expected topic %s partition %d to be assigned to a consumer", partition.Topic, partition.Partition)
521 }
522
523 if _, exists := prevAssignment[partition]; exists {
524 if len(currentAssignment[consumer]) > (len(currentAssignment[prevAssignment[partition].MemberID]) + 1) {
525 sortedCurrentSubscriptions = s.reassignPartition(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, prevAssignment[partition].MemberID)
526 reassignmentPerformed = true
527 modified = true
528 continue
529 }
530 }
531
532 // check if a better-suited consumer exists for the partition; if so, reassign it
533 for _, otherConsumer := range partition2AllPotentialConsumers[partition] {
534 if len(currentAssignment[consumer]) > (len(currentAssignment[otherConsumer]) + 1) {
535 sortedCurrentSubscriptions = s.reassignPartitionToNewConsumer(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, consumer2AllPotentialPartitions)
536 reassignmentPerformed = true
537 modified = true
538 break
539 }
540 }
541 }
542 if !modified {
543 return reassignmentPerformed
544 }
545 }
546}
547
548// Identify a new consumer for a topic partition and reassign it.
549func (s *stickyBalanceStrategy) reassignPartitionToNewConsumer(partition topicPartitionAssignment, currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, currentPartitionConsumer map[topicPartitionAssignment]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) []string {
550 for _, anotherConsumer := range sortedCurrentSubscriptions {
551 if memberAssignmentsIncludeTopicPartition(consumer2AllPotentialPartitions[anotherConsumer], partition) {
552 return s.reassignPartition(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, anotherConsumer)
553 }
554 }
555 return sortedCurrentSubscriptions
556}
557
558// Reassign a specific partition to a new consumer
559func (s *stickyBalanceStrategy) reassignPartition(partition topicPartitionAssignment, currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, currentPartitionConsumer map[topicPartitionAssignment]string, newConsumer string) []string {
560 consumer := currentPartitionConsumer[partition]
561 // find the correct partition movement considering the stickiness requirement
562 partitionToBeMoved := s.movements.getTheActualPartitionToBeMoved(partition, consumer, newConsumer)
563 return s.processPartitionMovement(partitionToBeMoved, newConsumer, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer)
564}
565
566// Track the movement of a topic partition after assignment
567func (s *stickyBalanceStrategy) processPartitionMovement(partition topicPartitionAssignment, newConsumer string, currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, currentPartitionConsumer map[topicPartitionAssignment]string) []string {
568 oldConsumer := currentPartitionConsumer[partition]
569 s.movements.movePartition(partition, oldConsumer, newConsumer)
570
571 currentAssignment[oldConsumer] = removeTopicPartitionFromMemberAssignments(currentAssignment[oldConsumer], partition)
572 currentAssignment[newConsumer] = append(currentAssignment[newConsumer], partition)
573 currentPartitionConsumer[partition] = newConsumer
574 return sortMemberIDsByPartitionAssignments(currentAssignment)
575}
576
577// Determine whether a specific consumer should be considered for topic partition assignment.
578func canConsumerParticipateInReassignment(memberID string, currentAssignment map[string][]topicPartitionAssignment, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string) bool {
579 currentPartitions := currentAssignment[memberID]
580 currentAssignmentSize := len(currentPartitions)
581 maxAssignmentSize := len(consumer2AllPotentialPartitions[memberID])
582 if currentAssignmentSize > maxAssignmentSize {
583 Logger.Printf("The consumer %s is assigned more partitions than the maximum possible", memberID)
584 }
585 if currentAssignmentSize < maxAssignmentSize {
586 // if a consumer is not assigned all its potential partitions it is subject to reassignment
587 return true
588 }
589 for _, partition := range currentPartitions {
590 if canTopicPartitionParticipateInReassignment(partition, partition2AllPotentialConsumers) {
591 return true
592 }
593 }
594 return false
595}
596
597// Only consider reassigning those topic partitions that have two or more potential consumers.
598func canTopicPartitionParticipateInReassignment(partition topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string) bool {
599 return len(partition2AllPotentialConsumers[partition]) >= 2
600}
601
602// The assignment should improve the overall balance of the partition assignments to consumers.
603func assignPartition(partition topicPartitionAssignment, sortedCurrentSubscriptions []string, currentAssignment map[string][]topicPartitionAssignment, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, currentPartitionConsumer map[topicPartitionAssignment]string) []string {
604 for _, memberID := range sortedCurrentSubscriptions {
605 if memberAssignmentsIncludeTopicPartition(consumer2AllPotentialPartitions[memberID], partition) {
606 currentAssignment[memberID] = append(currentAssignment[memberID], partition)
607 currentPartitionConsumer[partition] = memberID
608 break
609 }
610 }
611 return sortMemberIDsByPartitionAssignments(currentAssignment)
612}
613
614// Deserialize topic partition assignment data to aid with creation of a sticky assignment.
615func deserializeTopicPartitionAssignment(userDataBytes []byte) (StickyAssignorUserData, error) {
616 userDataV1 := &StickyAssignorUserDataV1{}
617 if err := decode(userDataBytes, userDataV1); err != nil {
618 userDataV0 := &StickyAssignorUserDataV0{}
619 if err := decode(userDataBytes, userDataV0); err != nil {
620 return nil, err
621 }
622 return userDataV0, nil
623 }
624 return userDataV1, nil
625}
626
627// filterAssignedPartitions returns a map of consumer group members to their list of previously-assigned topic partitions, limited
628// to those topic partitions currently reported by the Kafka cluster.
629func filterAssignedPartitions(currentAssignment map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string) map[string][]topicPartitionAssignment {
630 assignments := deepCopyAssignment(currentAssignment)
631 for memberID, partitions := range assignments {
632 // perform in-place filtering
633 i := 0
634 for _, partition := range partitions {
635 if _, exists := partition2AllPotentialConsumers[partition]; exists {
636 partitions[i] = partition
637 i++
638 }
639 }
640 assignments[memberID] = partitions[:i]
641 }
642 return assignments
643}
644
645func removeTopicPartitionFromMemberAssignments(assignments []topicPartitionAssignment, topic topicPartitionAssignment) []topicPartitionAssignment {
646 for i, assignment := range assignments {
647 if assignment == topic {
648 return append(assignments[:i], assignments[i+1:]...)
649 }
650 }
651 return assignments
652}
653
654func memberAssignmentsIncludeTopicPartition(assignments []topicPartitionAssignment, topic topicPartitionAssignment) bool {
655 for _, assignment := range assignments {
656 if assignment == topic {
657 return true
658 }
659 }
660 return false
661}
662
663func sortPartitions(currentAssignment map[string][]topicPartitionAssignment, partitionsWithADifferentPreviousAssignment map[topicPartitionAssignment]consumerGenerationPair, isFreshAssignment bool, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) []topicPartitionAssignment {
664 unassignedPartitions := make(map[topicPartitionAssignment]bool, len(partition2AllPotentialConsumers))
665 for partition := range partition2AllPotentialConsumers {
666 unassignedPartitions[partition] = true
667 }
668
669 sortedPartitions := make([]topicPartitionAssignment, 0)
670 if !isFreshAssignment && areSubscriptionsIdentical(partition2AllPotentialConsumers, consumer2AllPotentialPartitions) {
671 // if this is a reassignment and the subscriptions are identical (all consumers can consumer from all topics)
672 // then we just need to simply list partitions in a round robin fashion (from consumers with
673 // most assigned partitions to those with least)
674 assignments := filterAssignedPartitions(currentAssignment, partition2AllPotentialConsumers)
675
676 // use priority-queue to evaluate consumer group members in descending-order based on
677 // the number of topic partition assignments (i.e. consumers with most assignments first)
678 pq := make(assignmentPriorityQueue, len(assignments))
679 i := 0
680 for consumerID, consumerAssignments := range assignments {
681 pq[i] = &consumerGroupMember{
682 id: consumerID,
683 assignments: consumerAssignments,
684 }
685 i++
686 }
687 heap.Init(&pq)
688
689 for {
690 // loop until no consumer-group members remain
691 if pq.Len() == 0 {
692 break
693 }
694 member := pq[0]
695
696 // partitions that were assigned to a different consumer last time
697 var prevPartitionIndex int
698 for i, partition := range member.assignments {
699 if _, exists := partitionsWithADifferentPreviousAssignment[partition]; exists {
700 prevPartitionIndex = i
701 break
702 }
703 }
704
705 if len(member.assignments) > 0 {
706 partition := member.assignments[prevPartitionIndex]
707 sortedPartitions = append(sortedPartitions, partition)
708 delete(unassignedPartitions, partition)
709 if prevPartitionIndex == 0 {
710 member.assignments = member.assignments[1:]
711 } else {
712 member.assignments = append(member.assignments[:prevPartitionIndex], member.assignments[prevPartitionIndex+1:]...)
713 }
714 heap.Fix(&pq, 0)
715 } else {
716 heap.Pop(&pq)
717 }
718 }
719
720 for partition := range unassignedPartitions {
721 sortedPartitions = append(sortedPartitions, partition)
722 }
723 } else {
724 // an ascending sorted set of topic partitions based on how many consumers can potentially use them
725 sortedPartitions = sortPartitionsByPotentialConsumerAssignments(partition2AllPotentialConsumers)
726 }
727 return sortedPartitions
728}
729
730func sortMemberIDsByPartitionAssignments(assignments map[string][]topicPartitionAssignment) []string {
731 // sort the members by the number of partition assignments in ascending order
732 sortedMemberIDs := make([]string, 0, len(assignments))
733 for memberID := range assignments {
734 sortedMemberIDs = append(sortedMemberIDs, memberID)
735 }
736 sort.SliceStable(sortedMemberIDs, func(i, j int) bool {
737 ret := len(assignments[sortedMemberIDs[i]]) - len(assignments[sortedMemberIDs[j]])
738 if ret == 0 {
739 return sortedMemberIDs[i] < sortedMemberIDs[j]
740 }
741 return len(assignments[sortedMemberIDs[i]]) < len(assignments[sortedMemberIDs[j]])
742 })
743 return sortedMemberIDs
744}
745
746func sortPartitionsByPotentialConsumerAssignments(partition2AllPotentialConsumers map[topicPartitionAssignment][]string) []topicPartitionAssignment {
747 // sort the members by the number of partition assignments in descending order
748 sortedPartionIDs := make([]topicPartitionAssignment, len(partition2AllPotentialConsumers))
749 i := 0
750 for partition := range partition2AllPotentialConsumers {
751 sortedPartionIDs[i] = partition
752 i++
753 }
754 sort.Slice(sortedPartionIDs, func(i, j int) bool {
755 if len(partition2AllPotentialConsumers[sortedPartionIDs[i]]) == len(partition2AllPotentialConsumers[sortedPartionIDs[j]]) {
756 ret := strings.Compare(sortedPartionIDs[i].Topic, sortedPartionIDs[j].Topic)
757 if ret == 0 {
758 return sortedPartionIDs[i].Partition < sortedPartionIDs[j].Partition
759 }
760 return ret < 0
761 }
762 return len(partition2AllPotentialConsumers[sortedPartionIDs[i]]) < len(partition2AllPotentialConsumers[sortedPartionIDs[j]])
763 })
764 return sortedPartionIDs
765}
766
767func deepCopyAssignment(assignment map[string][]topicPartitionAssignment) map[string][]topicPartitionAssignment {
768 m := make(map[string][]topicPartitionAssignment, len(assignment))
769 for memberID, subscriptions := range assignment {
770 m[memberID] = append(subscriptions[:0:0], subscriptions...)
771 }
772 return m
773}
774
775func areSubscriptionsIdentical(partition2AllPotentialConsumers map[topicPartitionAssignment][]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) bool {
776 curMembers := make(map[string]int)
777 for _, cur := range partition2AllPotentialConsumers {
778 if len(curMembers) == 0 {
779 for _, curMembersElem := range cur {
780 curMembers[curMembersElem]++
781 }
782 continue
783 }
784
785 if len(curMembers) != len(cur) {
786 return false
787 }
788
789 yMap := make(map[string]int)
790 for _, yElem := range cur {
791 yMap[yElem]++
792 }
793
794 for curMembersMapKey, curMembersMapVal := range curMembers {
795 if yMap[curMembersMapKey] != curMembersMapVal {
796 return false
797 }
798 }
799 }
800
801 curPartitions := make(map[topicPartitionAssignment]int)
802 for _, cur := range consumer2AllPotentialPartitions {
803 if len(curPartitions) == 0 {
804 for _, curPartitionElem := range cur {
805 curPartitions[curPartitionElem]++
806 }
807 continue
808 }
809
810 if len(curPartitions) != len(cur) {
811 return false
812 }
813
814 yMap := make(map[topicPartitionAssignment]int)
815 for _, yElem := range cur {
816 yMap[yElem]++
817 }
818
819 for curMembersMapKey, curMembersMapVal := range curPartitions {
820 if yMap[curMembersMapKey] != curMembersMapVal {
821 return false
822 }
823 }
824 }
825 return true
826}
827
828// We need to process subscriptions' user data with each consumer's reported generation in mind
829// higher generations overwrite lower generations in case of a conflict
830// note that a conflict could exist only if user data is for different generations
831func prepopulateCurrentAssignments(members map[string]ConsumerGroupMemberMetadata) (map[string][]topicPartitionAssignment, map[topicPartitionAssignment]consumerGenerationPair, error) {
832 currentAssignment := make(map[string][]topicPartitionAssignment)
833 prevAssignment := make(map[topicPartitionAssignment]consumerGenerationPair)
834
835 // for each partition we create a sorted map of its consumers by generation
836 sortedPartitionConsumersByGeneration := make(map[topicPartitionAssignment]map[int]string)
837 for memberID, meta := range members {
838 consumerUserData, err := deserializeTopicPartitionAssignment(meta.UserData)
839 if err != nil {
840 return nil, nil, err
841 }
842 for _, partition := range consumerUserData.partitions() {
843 if consumers, exists := sortedPartitionConsumersByGeneration[partition]; exists {
844 if consumerUserData.hasGeneration() {
845 if _, generationExists := consumers[consumerUserData.generation()]; generationExists {
846 // same partition is assigned to two consumers during the same rebalance.
847 // log a warning and skip this record
848 Logger.Printf("Topic %s Partition %d is assigned to multiple consumers following sticky assignment generation %d", partition.Topic, partition.Partition, consumerUserData.generation())
849 continue
850 } else {
851 consumers[consumerUserData.generation()] = memberID
852 }
853 } else {
854 consumers[defaultGeneration] = memberID
855 }
856 } else {
857 generation := defaultGeneration
858 if consumerUserData.hasGeneration() {
859 generation = consumerUserData.generation()
860 }
861 sortedPartitionConsumersByGeneration[partition] = map[int]string{generation: memberID}
862 }
863 }
864 }
865
866 // prevAssignment holds the prior ConsumerGenerationPair (before current) of each partition
867 // current and previous consumers are the last two consumers of each partition in the above sorted map
868 for partition, consumers := range sortedPartitionConsumersByGeneration {
869 // sort consumers by generation in decreasing order
870 var generations []int
871 for generation := range consumers {
872 generations = append(generations, generation)
873 }
874 sort.Sort(sort.Reverse(sort.IntSlice(generations)))
875
876 consumer := consumers[generations[0]]
877 if _, exists := currentAssignment[consumer]; !exists {
878 currentAssignment[consumer] = []topicPartitionAssignment{partition}
879 } else {
880 currentAssignment[consumer] = append(currentAssignment[consumer], partition)
881 }
882
883 // check for previous assignment, if any
884 if len(generations) > 1 {
885 prevAssignment[partition] = consumerGenerationPair{
886 MemberID: consumers[generations[1]],
887 Generation: generations[1],
888 }
889 }
890 }
891 return currentAssignment, prevAssignment, nil
892}
893
894type consumerGenerationPair struct {
895 MemberID string
896 Generation int
897}
898
899// consumerPair represents a pair of Kafka consumer ids involved in a partition reassignment.
900type consumerPair struct {
901 SrcMemberID string
902 DstMemberID string
903}
904
905// partitionMovements maintains some data structures to simplify lookup of partition movements among consumers.
906type partitionMovements struct {
907 PartitionMovementsByTopic map[string]map[consumerPair]map[topicPartitionAssignment]bool
908 Movements map[topicPartitionAssignment]consumerPair
909}
910
911func (p *partitionMovements) removeMovementRecordOfPartition(partition topicPartitionAssignment) consumerPair {
912 pair := p.Movements[partition]
913 delete(p.Movements, partition)
914
915 partitionMovementsForThisTopic := p.PartitionMovementsByTopic[partition.Topic]
916 delete(partitionMovementsForThisTopic[pair], partition)
917 if len(partitionMovementsForThisTopic[pair]) == 0 {
918 delete(partitionMovementsForThisTopic, pair)
919 }
920 if len(p.PartitionMovementsByTopic[partition.Topic]) == 0 {
921 delete(p.PartitionMovementsByTopic, partition.Topic)
922 }
923 return pair
924}
925
926func (p *partitionMovements) addPartitionMovementRecord(partition topicPartitionAssignment, pair consumerPair) {
927 p.Movements[partition] = pair
928 if _, exists := p.PartitionMovementsByTopic[partition.Topic]; !exists {
929 p.PartitionMovementsByTopic[partition.Topic] = make(map[consumerPair]map[topicPartitionAssignment]bool)
930 }
931 partitionMovementsForThisTopic := p.PartitionMovementsByTopic[partition.Topic]
932 if _, exists := partitionMovementsForThisTopic[pair]; !exists {
933 partitionMovementsForThisTopic[pair] = make(map[topicPartitionAssignment]bool)
934 }
935 partitionMovementsForThisTopic[pair][partition] = true
936}
937
938func (p *partitionMovements) movePartition(partition topicPartitionAssignment, oldConsumer, newConsumer string) {
939 pair := consumerPair{
940 SrcMemberID: oldConsumer,
941 DstMemberID: newConsumer,
942 }
943 if _, exists := p.Movements[partition]; exists {
944 // this partition has previously moved
945 existingPair := p.removeMovementRecordOfPartition(partition)
946 if existingPair.DstMemberID != oldConsumer {
947 Logger.Printf("Existing pair DstMemberID %s was not equal to the oldConsumer ID %s", existingPair.DstMemberID, oldConsumer)
948 }
949 if existingPair.SrcMemberID != newConsumer {
950 // the partition is not moving back to its previous consumer
951 p.addPartitionMovementRecord(partition, consumerPair{
952 SrcMemberID: existingPair.SrcMemberID,
953 DstMemberID: newConsumer,
954 })
955 }
956 } else {
957 p.addPartitionMovementRecord(partition, pair)
958 }
959}
960
961func (p *partitionMovements) getTheActualPartitionToBeMoved(partition topicPartitionAssignment, oldConsumer, newConsumer string) topicPartitionAssignment {
962 if _, exists := p.PartitionMovementsByTopic[partition.Topic]; !exists {
963 return partition
964 }
965 if _, exists := p.Movements[partition]; exists {
966 // this partition has previously moved
967 if oldConsumer != p.Movements[partition].DstMemberID {
968 Logger.Printf("Partition movement DstMemberID %s was not equal to the oldConsumer ID %s", p.Movements[partition].DstMemberID, oldConsumer)
969 }
970 oldConsumer = p.Movements[partition].SrcMemberID
971 }
972
973 partitionMovementsForThisTopic := p.PartitionMovementsByTopic[partition.Topic]
974 reversePair := consumerPair{
975 SrcMemberID: newConsumer,
976 DstMemberID: oldConsumer,
977 }
978 if _, exists := partitionMovementsForThisTopic[reversePair]; !exists {
979 return partition
980 }
981 var reversePairPartition topicPartitionAssignment
982 for otherPartition := range partitionMovementsForThisTopic[reversePair] {
983 reversePairPartition = otherPartition
984 }
985 return reversePairPartition
986}
987
988func (p *partitionMovements) isLinked(src, dst string, pairs []consumerPair, currentPath []string) ([]string, bool) {
989 if src == dst {
990 return currentPath, false
991 }
992 if len(pairs) == 0 {
993 return currentPath, false
994 }
995 for _, pair := range pairs {
996 if src == pair.SrcMemberID && dst == pair.DstMemberID {
997 currentPath = append(currentPath, src, dst)
998 return currentPath, true
999 }
1000 }
1001
1002 for _, pair := range pairs {
1003 if pair.SrcMemberID == src {
1004 // create a deep copy of the pairs, excluding the current pair
1005 reducedSet := make([]consumerPair, len(pairs)-1)
1006 i := 0
1007 for _, p := range pairs {
1008 if p != pair {
1009 reducedSet[i] = pair
1010 i++
1011 }
1012 }
1013
1014 currentPath = append(currentPath, pair.SrcMemberID)
1015 return p.isLinked(pair.DstMemberID, dst, reducedSet, currentPath)
1016 }
1017 }
1018 return currentPath, false
1019}
1020
1021func (p *partitionMovements) in(cycle []string, cycles [][]string) bool {
1022 superCycle := make([]string, len(cycle)-1)
1023 for i := 0; i < len(cycle)-1; i++ {
1024 superCycle[i] = cycle[i]
1025 }
1026 superCycle = append(superCycle, cycle...)
1027 for _, foundCycle := range cycles {
1028 if len(foundCycle) == len(cycle) && indexOfSubList(superCycle, foundCycle) != -1 {
1029 return true
1030 }
1031 }
1032 return false
1033}
1034
1035func (p *partitionMovements) hasCycles(pairs []consumerPair) bool {
1036 cycles := make([][]string, 0)
1037 for _, pair := range pairs {
1038 // create a deep copy of the pairs, excluding the current pair
1039 reducedPairs := make([]consumerPair, len(pairs)-1)
1040 i := 0
1041 for _, p := range pairs {
1042 if p != pair {
1043 reducedPairs[i] = pair
1044 i++
1045 }
1046 }
1047 if path, linked := p.isLinked(pair.DstMemberID, pair.SrcMemberID, reducedPairs, []string{pair.SrcMemberID}); linked {
1048 if !p.in(path, cycles) {
1049 cycles = append(cycles, path)
1050 Logger.Printf("A cycle of length %d was found: %v", len(path)-1, path)
1051 }
1052 }
1053 }
1054
1055 // for now we want to make sure there is no partition movements of the same topic between a pair of consumers.
1056 // the odds of finding a cycle among more than two consumers seem to be very low (according to various randomized
1057 // tests with the given sticky algorithm) that it should not worth the added complexity of handling those cases.
1058 for _, cycle := range cycles {
1059 if len(cycle) == 3 {
1060 return true
1061 }
1062 }
1063 return false
1064}
1065
1066func (p *partitionMovements) isSticky() bool {
1067 for topic, movements := range p.PartitionMovementsByTopic {
1068 movementPairs := make([]consumerPair, len(movements))
1069 i := 0
1070 for pair := range movements {
1071 movementPairs[i] = pair
1072 i++
1073 }
1074 if p.hasCycles(movementPairs) {
1075 Logger.Printf("Stickiness is violated for topic %s", topic)
1076 Logger.Printf("Partition movements for this topic occurred among the following consumer pairs: %v", movements)
1077 return false
1078 }
1079 }
1080 return true
1081}
1082
1083func indexOfSubList(source []string, target []string) int {
1084 targetSize := len(target)
1085 maxCandidate := len(source) - targetSize
1086nextCand:
1087 for candidate := 0; candidate <= maxCandidate; candidate++ {
1088 j := candidate
1089 for i := 0; i < targetSize; i++ {
1090 if target[i] != source[j] {
1091 // Element mismatch, try next cand
1092 continue nextCand
1093 }
1094 j++
1095 }
1096 // All elements of candidate matched target
1097 return candidate
1098 }
1099 return -1
1100}
1101
1102type consumerGroupMember struct {
1103 id string
1104 assignments []topicPartitionAssignment
1105}
1106
1107// assignmentPriorityQueue is a priority-queue of consumer group members that is sorted
1108// in descending order (most assignments to least assignments).
1109type assignmentPriorityQueue []*consumerGroupMember
1110
1111func (pq assignmentPriorityQueue) Len() int { return len(pq) }
1112
1113func (pq assignmentPriorityQueue) Less(i, j int) bool {
1114 // order asssignment priority queue in descending order using assignment-count/member-id
1115 if len(pq[i].assignments) == len(pq[j].assignments) {
1116 return strings.Compare(pq[i].id, pq[j].id) > 0
1117 }
1118 return len(pq[i].assignments) > len(pq[j].assignments)
1119}
1120
1121func (pq assignmentPriorityQueue) Swap(i, j int) {
1122 pq[i], pq[j] = pq[j], pq[i]
1123}
1124
1125func (pq *assignmentPriorityQueue) Push(x interface{}) {
1126 member := x.(*consumerGroupMember)
1127 *pq = append(*pq, member)
1128}
1129
1130func (pq *assignmentPriorityQueue) Pop() interface{} {
1131 old := *pq
1132 n := len(old)
1133 member := old[n-1]
1134 *pq = old[0 : n-1]
1135 return member
1136}