blob: 67c4d96d042d578539406785cec44ef073fdf3f4 [file] [log] [blame]
Scott Bakered4efab2020-01-13 19:12:25 -08001package sarama
2
3import (
4 "container/heap"
5 "math"
6 "sort"
7 "strings"
8)
9
10const (
11 // RangeBalanceStrategyName identifies strategies that use the range partition assignment strategy
12 RangeBalanceStrategyName = "range"
13
14 // RoundRobinBalanceStrategyName identifies strategies that use the round-robin partition assignment strategy
15 RoundRobinBalanceStrategyName = "roundrobin"
16
17 // StickyBalanceStrategyName identifies strategies that use the sticky-partition assignment strategy
18 StickyBalanceStrategyName = "sticky"
19
20 defaultGeneration = -1
21)
22
23// BalanceStrategyPlan is the results of any BalanceStrategy.Plan attempt.
24// It contains an allocation of topic/partitions by memberID in the form of
25// a `memberID -> topic -> partitions` map.
26type BalanceStrategyPlan map[string]map[string][]int32
27
28// Add assigns a topic with a number partitions to a member.
29func (p BalanceStrategyPlan) Add(memberID, topic string, partitions ...int32) {
30 if len(partitions) == 0 {
31 return
32 }
33 if _, ok := p[memberID]; !ok {
34 p[memberID] = make(map[string][]int32, 1)
35 }
36 p[memberID][topic] = append(p[memberID][topic], partitions...)
37}
38
39// --------------------------------------------------------------------
40
41// BalanceStrategy is used to balance topics and partitions
42// across members of a consumer group
43type BalanceStrategy interface {
44 // Name uniquely identifies the strategy.
45 Name() string
46
47 // Plan accepts a map of `memberID -> metadata` and a map of `topic -> partitions`
48 // and returns a distribution plan.
49 Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error)
50}
51
52// --------------------------------------------------------------------
53
54// BalanceStrategyRange is the default and assigns partitions as ranges to consumer group members.
55// Example with one topic T with six partitions (0..5) and two members (M1, M2):
56// M1: {T: [0, 1, 2]}
57// M2: {T: [3, 4, 5]}
58var BalanceStrategyRange = &balanceStrategy{
59 name: RangeBalanceStrategyName,
60 coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) {
61 step := float64(len(partitions)) / float64(len(memberIDs))
62
63 for i, memberID := range memberIDs {
64 pos := float64(i)
65 min := int(math.Floor(pos*step + 0.5))
66 max := int(math.Floor((pos+1)*step + 0.5))
67 plan.Add(memberID, topic, partitions[min:max]...)
68 }
69 },
70}
71
72// BalanceStrategyRoundRobin assigns partitions to members in alternating order.
73// Example with topic T with six partitions (0..5) and two members (M1, M2):
74// M1: {T: [0, 2, 4]}
75// M2: {T: [1, 3, 5]}
76var BalanceStrategyRoundRobin = &balanceStrategy{
77 name: RoundRobinBalanceStrategyName,
78 coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) {
79 for i, part := range partitions {
80 memberID := memberIDs[i%len(memberIDs)]
81 plan.Add(memberID, topic, part)
82 }
83 },
84}
85
86// BalanceStrategySticky assigns partitions to members with an attempt to preserve earlier assignments
87// while maintain a balanced partition distribution.
88// Example with topic T with six partitions (0..5) and two members (M1, M2):
89// M1: {T: [0, 2, 4]}
90// M2: {T: [1, 3, 5]}
91//
92// On reassignment with an additional consumer, you might get an assignment plan like:
93// M1: {T: [0, 2]}
94// M2: {T: [1, 3]}
95// M3: {T: [4, 5]}
96//
97var BalanceStrategySticky = &stickyBalanceStrategy{}
98
99// --------------------------------------------------------------------
100
101type balanceStrategy struct {
102 name string
103 coreFn func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32)
104}
105
106// Name implements BalanceStrategy.
107func (s *balanceStrategy) Name() string { return s.name }
108
109// Plan implements BalanceStrategy.
110func (s *balanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
111 // Build members by topic map
112 mbt := make(map[string][]string)
113 for memberID, meta := range members {
114 for _, topic := range meta.Topics {
115 mbt[topic] = append(mbt[topic], memberID)
116 }
117 }
118
119 // Sort members for each topic
120 for topic, memberIDs := range mbt {
121 sort.Sort(&balanceStrategySortable{
122 topic: topic,
123 memberIDs: memberIDs,
124 })
125 }
126
127 // Assemble plan
128 plan := make(BalanceStrategyPlan, len(members))
129 for topic, memberIDs := range mbt {
130 s.coreFn(plan, memberIDs, topic, topics[topic])
131 }
132 return plan, nil
133}
134
135type balanceStrategySortable struct {
136 topic string
137 memberIDs []string
138}
139
140func (p balanceStrategySortable) Len() int { return len(p.memberIDs) }
141func (p balanceStrategySortable) Swap(i, j int) {
142 p.memberIDs[i], p.memberIDs[j] = p.memberIDs[j], p.memberIDs[i]
143}
144func (p balanceStrategySortable) Less(i, j int) bool {
145 return balanceStrategyHashValue(p.topic, p.memberIDs[i]) < balanceStrategyHashValue(p.topic, p.memberIDs[j])
146}
147
148func balanceStrategyHashValue(vv ...string) uint32 {
149 h := uint32(2166136261)
150 for _, s := range vv {
151 for _, c := range s {
152 h ^= uint32(c)
153 h *= 16777619
154 }
155 }
156 return h
157}
158
159type stickyBalanceStrategy struct {
160 movements partitionMovements
161}
162
163// Name implements BalanceStrategy.
164func (s *stickyBalanceStrategy) Name() string { return StickyBalanceStrategyName }
165
166// Plan implements BalanceStrategy.
167func (s *stickyBalanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
168 // track partition movements during generation of the partition assignment plan
169 s.movements = partitionMovements{
170 Movements: make(map[topicPartitionAssignment]consumerPair),
171 PartitionMovementsByTopic: make(map[string]map[consumerPair]map[topicPartitionAssignment]bool),
172 }
173
174 // prepopulate the current assignment state from userdata on the consumer group members
175 currentAssignment, prevAssignment, err := prepopulateCurrentAssignments(members)
176 if err != nil {
177 return nil, err
178 }
179
180 // determine if we're dealing with a completely fresh assignment, or if there's existing assignment state
181 isFreshAssignment := false
182 if len(currentAssignment) == 0 {
183 isFreshAssignment = true
184 }
185
186 // create a mapping of all current topic partitions and the consumers that can be assigned to them
187 partition2AllPotentialConsumers := make(map[topicPartitionAssignment][]string)
188 for topic, partitions := range topics {
189 for _, partition := range partitions {
190 partition2AllPotentialConsumers[topicPartitionAssignment{Topic: topic, Partition: partition}] = []string{}
191 }
192 }
193
194 // create a mapping of all consumers to all potential topic partitions that can be assigned to them
195 // also, populate the mapping of partitions to potential consumers
196 consumer2AllPotentialPartitions := make(map[string][]topicPartitionAssignment, len(members))
197 for memberID, meta := range members {
198 consumer2AllPotentialPartitions[memberID] = make([]topicPartitionAssignment, 0)
199 for _, topicSubscription := range meta.Topics {
200 // only evaluate topic subscriptions that are present in the supplied topics map
201 if _, found := topics[topicSubscription]; found {
202 for _, partition := range topics[topicSubscription] {
203 topicPartition := topicPartitionAssignment{Topic: topicSubscription, Partition: partition}
204 consumer2AllPotentialPartitions[memberID] = append(consumer2AllPotentialPartitions[memberID], topicPartition)
205 partition2AllPotentialConsumers[topicPartition] = append(partition2AllPotentialConsumers[topicPartition], memberID)
206 }
207 }
208 }
209
210 // add this consumer to currentAssignment (with an empty topic partition assignment) if it does not already exist
211 if _, exists := currentAssignment[memberID]; !exists {
212 currentAssignment[memberID] = make([]topicPartitionAssignment, 0)
213 }
214 }
215
216 // create a mapping of each partition to its current consumer, where possible
217 currentPartitionConsumers := make(map[topicPartitionAssignment]string, len(currentAssignment))
218 unvisitedPartitions := make(map[topicPartitionAssignment]bool, len(partition2AllPotentialConsumers))
219 for partition := range partition2AllPotentialConsumers {
220 unvisitedPartitions[partition] = true
221 }
222 var unassignedPartitions []topicPartitionAssignment
223 for memberID, partitions := range currentAssignment {
224 var keepPartitions []topicPartitionAssignment
225 for _, partition := range partitions {
226 // If this partition no longer exists at all, likely due to the
227 // topic being deleted, we remove the partition from the member.
228 if _, exists := partition2AllPotentialConsumers[partition]; !exists {
229 continue
230 }
231 delete(unvisitedPartitions, partition)
232 currentPartitionConsumers[partition] = memberID
233
234 if !strsContains(members[memberID].Topics, partition.Topic) {
235 unassignedPartitions = append(unassignedPartitions, partition)
236 continue
237 }
238 keepPartitions = append(keepPartitions, partition)
239 }
240 currentAssignment[memberID] = keepPartitions
241 }
242 for unvisited := range unvisitedPartitions {
243 unassignedPartitions = append(unassignedPartitions, unvisited)
244 }
245
246 // sort the topic partitions in order of priority for reassignment
247 sortedPartitions := sortPartitions(currentAssignment, prevAssignment, isFreshAssignment, partition2AllPotentialConsumers, consumer2AllPotentialPartitions)
248
249 // at this point we have preserved all valid topic partition to consumer assignments and removed
250 // all invalid topic partitions and invalid consumers. Now we need to assign unassignedPartitions
251 // to consumers so that the topic partition assignments are as balanced as possible.
252
253 // an ascending sorted set of consumers based on how many topic partitions are already assigned to them
254 sortedCurrentSubscriptions := sortMemberIDsByPartitionAssignments(currentAssignment)
255 s.balance(currentAssignment, prevAssignment, sortedPartitions, unassignedPartitions, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumers)
256
257 // Assemble plan
258 plan := make(BalanceStrategyPlan, len(currentAssignment))
259 for memberID, assignments := range currentAssignment {
260 if len(assignments) == 0 {
261 plan[memberID] = make(map[string][]int32, 0)
262 } else {
263 for _, assignment := range assignments {
264 plan.Add(memberID, assignment.Topic, assignment.Partition)
265 }
266 }
267 }
268 return plan, nil
269}
270
271func strsContains(s []string, value string) bool {
272 for _, entry := range s {
273 if entry == value {
274 return true
275 }
276 }
277 return false
278}
279
280// Balance assignments across consumers for maximum fairness and stickiness.
281func (s *stickyBalanceStrategy) balance(currentAssignment map[string][]topicPartitionAssignment, prevAssignment map[topicPartitionAssignment]consumerGenerationPair, sortedPartitions []topicPartitionAssignment, unassignedPartitions []topicPartitionAssignment, sortedCurrentSubscriptions []string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, currentPartitionConsumer map[topicPartitionAssignment]string) {
282 initializing := false
283 if len(sortedCurrentSubscriptions) == 0 || len(currentAssignment[sortedCurrentSubscriptions[0]]) == 0 {
284 initializing = true
285 }
286
287 // assign all unassigned partitions
288 for _, partition := range unassignedPartitions {
289 // skip if there is no potential consumer for the partition
290 if len(partition2AllPotentialConsumers[partition]) == 0 {
291 continue
292 }
293 sortedCurrentSubscriptions = assignPartition(partition, sortedCurrentSubscriptions, currentAssignment, consumer2AllPotentialPartitions, currentPartitionConsumer)
294 }
295
296 // narrow down the reassignment scope to only those partitions that can actually be reassigned
297 for partition := range partition2AllPotentialConsumers {
298 if !canTopicPartitionParticipateInReassignment(partition, partition2AllPotentialConsumers) {
299 sortedPartitions = removeTopicPartitionFromMemberAssignments(sortedPartitions, partition)
300 }
301 }
302
303 // narrow down the reassignment scope to only those consumers that are subject to reassignment
304 fixedAssignments := make(map[string][]topicPartitionAssignment)
305 for memberID := range consumer2AllPotentialPartitions {
306 if !canConsumerParticipateInReassignment(memberID, currentAssignment, consumer2AllPotentialPartitions, partition2AllPotentialConsumers) {
307 fixedAssignments[memberID] = currentAssignment[memberID]
308 delete(currentAssignment, memberID)
309 sortedCurrentSubscriptions = sortMemberIDsByPartitionAssignments(currentAssignment)
310 }
311 }
312
313 // create a deep copy of the current assignment so we can revert to it if we do not get a more balanced assignment later
314 preBalanceAssignment := deepCopyAssignment(currentAssignment)
315 preBalancePartitionConsumers := make(map[topicPartitionAssignment]string, len(currentPartitionConsumer))
316 for k, v := range currentPartitionConsumer {
317 preBalancePartitionConsumers[k] = v
318 }
319
320 reassignmentPerformed := s.performReassignments(sortedPartitions, currentAssignment, prevAssignment, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer)
321
322 // if we are not preserving existing assignments and we have made changes to the current assignment
323 // make sure we are getting a more balanced assignment; otherwise, revert to previous assignment
324 if !initializing && reassignmentPerformed && getBalanceScore(currentAssignment) >= getBalanceScore(preBalanceAssignment) {
325 currentAssignment = deepCopyAssignment(preBalanceAssignment)
326 currentPartitionConsumer = make(map[topicPartitionAssignment]string, len(preBalancePartitionConsumers))
327 for k, v := range preBalancePartitionConsumers {
328 currentPartitionConsumer[k] = v
329 }
330 }
331
332 // add the fixed assignments (those that could not change) back
333 for consumer, assignments := range fixedAssignments {
334 currentAssignment[consumer] = assignments
335 }
336}
337
338// Calculate the balance score of the given assignment, as the sum of assigned partitions size difference of all consumer pairs.
339// A perfectly balanced assignment (with all consumers getting the same number of partitions) has a balance score of 0.
340// Lower balance score indicates a more balanced assignment.
341func getBalanceScore(assignment map[string][]topicPartitionAssignment) int {
342 consumer2AssignmentSize := make(map[string]int, len(assignment))
343 for memberID, partitions := range assignment {
344 consumer2AssignmentSize[memberID] = len(partitions)
345 }
346
347 var score float64
348 for memberID, consumerAssignmentSize := range consumer2AssignmentSize {
349 delete(consumer2AssignmentSize, memberID)
350 for _, otherConsumerAssignmentSize := range consumer2AssignmentSize {
351 score += math.Abs(float64(consumerAssignmentSize - otherConsumerAssignmentSize))
352 }
353 }
354 return int(score)
355}
356
357// Determine whether the current assignment plan is balanced.
358func isBalanced(currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, allSubscriptions map[string][]topicPartitionAssignment) bool {
359 sortedCurrentSubscriptions = sortMemberIDsByPartitionAssignments(currentAssignment)
360 min := len(currentAssignment[sortedCurrentSubscriptions[0]])
361 max := len(currentAssignment[sortedCurrentSubscriptions[len(sortedCurrentSubscriptions)-1]])
362 if min >= max-1 {
363 // if minimum and maximum numbers of partitions assigned to consumers differ by at most one return true
364 return true
365 }
366
367 // create a mapping from partitions to the consumer assigned to them
368 allPartitions := make(map[topicPartitionAssignment]string)
369 for memberID, partitions := range currentAssignment {
370 for _, partition := range partitions {
371 if _, exists := allPartitions[partition]; exists {
372 Logger.Printf("Topic %s Partition %d is assigned more than one consumer", partition.Topic, partition.Partition)
373 }
374 allPartitions[partition] = memberID
375 }
376 }
377
378 // for each consumer that does not have all the topic partitions it can get make sure none of the topic partitions it
379 // could but did not get cannot be moved to it (because that would break the balance)
380 for _, memberID := range sortedCurrentSubscriptions {
381 consumerPartitions := currentAssignment[memberID]
382 consumerPartitionCount := len(consumerPartitions)
383
384 // skip if this consumer already has all the topic partitions it can get
385 if consumerPartitionCount == len(allSubscriptions[memberID]) {
386 continue
387 }
388
389 // otherwise make sure it cannot get any more
390 potentialTopicPartitions := allSubscriptions[memberID]
391 for _, partition := range potentialTopicPartitions {
392 if !memberAssignmentsIncludeTopicPartition(currentAssignment[memberID], partition) {
393 otherConsumer := allPartitions[partition]
394 otherConsumerPartitionCount := len(currentAssignment[otherConsumer])
395 if consumerPartitionCount < otherConsumerPartitionCount {
396 return false
397 }
398 }
399 }
400 }
401 return true
402}
403
404// Reassign all topic partitions that need reassignment until balanced.
405func (s *stickyBalanceStrategy) performReassignments(reassignablePartitions []topicPartitionAssignment, currentAssignment map[string][]topicPartitionAssignment, prevAssignment map[topicPartitionAssignment]consumerGenerationPair, sortedCurrentSubscriptions []string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, currentPartitionConsumer map[topicPartitionAssignment]string) bool {
406 reassignmentPerformed := false
407 modified := false
408
409 // repeat reassignment until no partition can be moved to improve the balance
410 for {
411 modified = false
412 // reassign all reassignable partitions (starting from the partition with least potential consumers and if needed)
413 // until the full list is processed or a balance is achieved
414 for _, partition := range reassignablePartitions {
415 if isBalanced(currentAssignment, sortedCurrentSubscriptions, consumer2AllPotentialPartitions) {
416 break
417 }
418
419 // the partition must have at least two consumers
420 if len(partition2AllPotentialConsumers[partition]) <= 1 {
421 Logger.Printf("Expected more than one potential consumer for partition %s topic %d", partition.Topic, partition.Partition)
422 }
423
424 // the partition must have a consumer
425 consumer := currentPartitionConsumer[partition]
426 if consumer == "" {
427 Logger.Printf("Expected topic %s partition %d to be assigned to a consumer", partition.Topic, partition.Partition)
428 }
429
430 if _, exists := prevAssignment[partition]; exists {
431 if len(currentAssignment[consumer]) > (len(currentAssignment[prevAssignment[partition].MemberID]) + 1) {
432 sortedCurrentSubscriptions = s.reassignPartition(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, prevAssignment[partition].MemberID)
433 reassignmentPerformed = true
434 modified = true
435 continue
436 }
437 }
438
439 // check if a better-suited consumer exists for the partition; if so, reassign it
440 for _, otherConsumer := range partition2AllPotentialConsumers[partition] {
441 if len(currentAssignment[consumer]) > (len(currentAssignment[otherConsumer]) + 1) {
442 sortedCurrentSubscriptions = s.reassignPartitionToNewConsumer(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, consumer2AllPotentialPartitions)
443 reassignmentPerformed = true
444 modified = true
445 break
446 }
447 }
448 }
449 if !modified {
450 return reassignmentPerformed
451 }
452 }
453}
454
455// Identify a new consumer for a topic partition and reassign it.
456func (s *stickyBalanceStrategy) reassignPartitionToNewConsumer(partition topicPartitionAssignment, currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, currentPartitionConsumer map[topicPartitionAssignment]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) []string {
457 for _, anotherConsumer := range sortedCurrentSubscriptions {
458 if memberAssignmentsIncludeTopicPartition(consumer2AllPotentialPartitions[anotherConsumer], partition) {
459 return s.reassignPartition(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, anotherConsumer)
460 }
461 }
462 return sortedCurrentSubscriptions
463}
464
465// Reassign a specific partition to a new consumer
466func (s *stickyBalanceStrategy) reassignPartition(partition topicPartitionAssignment, currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, currentPartitionConsumer map[topicPartitionAssignment]string, newConsumer string) []string {
467 consumer := currentPartitionConsumer[partition]
468 // find the correct partition movement considering the stickiness requirement
469 partitionToBeMoved := s.movements.getTheActualPartitionToBeMoved(partition, consumer, newConsumer)
470 return s.processPartitionMovement(partitionToBeMoved, newConsumer, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer)
471}
472
473// Track the movement of a topic partition after assignment
474func (s *stickyBalanceStrategy) processPartitionMovement(partition topicPartitionAssignment, newConsumer string, currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, currentPartitionConsumer map[topicPartitionAssignment]string) []string {
475 oldConsumer := currentPartitionConsumer[partition]
476 s.movements.movePartition(partition, oldConsumer, newConsumer)
477
478 currentAssignment[oldConsumer] = removeTopicPartitionFromMemberAssignments(currentAssignment[oldConsumer], partition)
479 currentAssignment[newConsumer] = append(currentAssignment[newConsumer], partition)
480 currentPartitionConsumer[partition] = newConsumer
481 return sortMemberIDsByPartitionAssignments(currentAssignment)
482}
483
484// Determine whether a specific consumer should be considered for topic partition assignment.
485func canConsumerParticipateInReassignment(memberID string, currentAssignment map[string][]topicPartitionAssignment, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string) bool {
486 currentPartitions := currentAssignment[memberID]
487 currentAssignmentSize := len(currentPartitions)
488 maxAssignmentSize := len(consumer2AllPotentialPartitions[memberID])
489 if currentAssignmentSize > maxAssignmentSize {
490 Logger.Printf("The consumer %s is assigned more partitions than the maximum possible", memberID)
491 }
492 if currentAssignmentSize < maxAssignmentSize {
493 // if a consumer is not assigned all its potential partitions it is subject to reassignment
494 return true
495 }
496 for _, partition := range currentPartitions {
497 if canTopicPartitionParticipateInReassignment(partition, partition2AllPotentialConsumers) {
498 return true
499 }
500 }
501 return false
502}
503
504// Only consider reassigning those topic partitions that have two or more potential consumers.
505func canTopicPartitionParticipateInReassignment(partition topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string) bool {
506 return len(partition2AllPotentialConsumers[partition]) >= 2
507}
508
509// The assignment should improve the overall balance of the partition assignments to consumers.
510func assignPartition(partition topicPartitionAssignment, sortedCurrentSubscriptions []string, currentAssignment map[string][]topicPartitionAssignment, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, currentPartitionConsumer map[topicPartitionAssignment]string) []string {
511 for _, memberID := range sortedCurrentSubscriptions {
512 if memberAssignmentsIncludeTopicPartition(consumer2AllPotentialPartitions[memberID], partition) {
513 currentAssignment[memberID] = append(currentAssignment[memberID], partition)
514 currentPartitionConsumer[partition] = memberID
515 break
516 }
517 }
518 return sortMemberIDsByPartitionAssignments(currentAssignment)
519}
520
521// Deserialize topic partition assignment data to aid with creation of a sticky assignment.
522func deserializeTopicPartitionAssignment(userDataBytes []byte) (StickyAssignorUserData, error) {
523 userDataV1 := &StickyAssignorUserDataV1{}
524 if err := decode(userDataBytes, userDataV1); err != nil {
525 userDataV0 := &StickyAssignorUserDataV0{}
526 if err := decode(userDataBytes, userDataV0); err != nil {
527 return nil, err
528 }
529 return userDataV0, nil
530 }
531 return userDataV1, nil
532}
533
534// filterAssignedPartitions returns a map of consumer group members to their list of previously-assigned topic partitions, limited
535// to those topic partitions currently reported by the Kafka cluster.
536func filterAssignedPartitions(currentAssignment map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string) map[string][]topicPartitionAssignment {
537 assignments := deepCopyAssignment(currentAssignment)
538 for memberID, partitions := range assignments {
539 // perform in-place filtering
540 i := 0
541 for _, partition := range partitions {
542 if _, exists := partition2AllPotentialConsumers[partition]; exists {
543 partitions[i] = partition
544 i++
545 }
546 }
547 assignments[memberID] = partitions[:i]
548 }
549 return assignments
550}
551
552func removeTopicPartitionFromMemberAssignments(assignments []topicPartitionAssignment, topic topicPartitionAssignment) []topicPartitionAssignment {
553 for i, assignment := range assignments {
554 if assignment == topic {
555 return append(assignments[:i], assignments[i+1:]...)
556 }
557 }
558 return assignments
559}
560
561func memberAssignmentsIncludeTopicPartition(assignments []topicPartitionAssignment, topic topicPartitionAssignment) bool {
562 for _, assignment := range assignments {
563 if assignment == topic {
564 return true
565 }
566 }
567 return false
568}
569
570func sortPartitions(currentAssignment map[string][]topicPartitionAssignment, partitionsWithADifferentPreviousAssignment map[topicPartitionAssignment]consumerGenerationPair, isFreshAssignment bool, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) []topicPartitionAssignment {
571 unassignedPartitions := make(map[topicPartitionAssignment]bool, len(partition2AllPotentialConsumers))
572 for partition := range partition2AllPotentialConsumers {
573 unassignedPartitions[partition] = true
574 }
575
576 sortedPartitions := make([]topicPartitionAssignment, 0)
577 if !isFreshAssignment && areSubscriptionsIdentical(partition2AllPotentialConsumers, consumer2AllPotentialPartitions) {
578 // if this is a reassignment and the subscriptions are identical (all consumers can consumer from all topics)
579 // then we just need to simply list partitions in a round robin fashion (from consumers with
580 // most assigned partitions to those with least)
581 assignments := filterAssignedPartitions(currentAssignment, partition2AllPotentialConsumers)
582
583 // use priority-queue to evaluate consumer group members in descending-order based on
584 // the number of topic partition assignments (i.e. consumers with most assignments first)
585 pq := make(assignmentPriorityQueue, len(assignments))
586 i := 0
587 for consumerID, consumerAssignments := range assignments {
588 pq[i] = &consumerGroupMember{
589 id: consumerID,
590 assignments: consumerAssignments,
591 }
592 i++
593 }
594 heap.Init(&pq)
595
596 for {
597 // loop until no consumer-group members remain
598 if pq.Len() == 0 {
599 break
600 }
601 member := pq[0]
602
603 // partitions that were assigned to a different consumer last time
604 var prevPartitionIndex int
605 for i, partition := range member.assignments {
606 if _, exists := partitionsWithADifferentPreviousAssignment[partition]; exists {
607 prevPartitionIndex = i
608 break
609 }
610 }
611
612 if len(member.assignments) > 0 {
613 partition := member.assignments[prevPartitionIndex]
614 sortedPartitions = append(sortedPartitions, partition)
615 delete(unassignedPartitions, partition)
616 if prevPartitionIndex == 0 {
617 member.assignments = member.assignments[1:]
618 } else {
619 member.assignments = append(member.assignments[:prevPartitionIndex], member.assignments[prevPartitionIndex+1:]...)
620 }
621 heap.Fix(&pq, 0)
622 } else {
623 heap.Pop(&pq)
624 }
625 }
626
627 for partition := range unassignedPartitions {
628 sortedPartitions = append(sortedPartitions, partition)
629 }
630 } else {
631 // an ascending sorted set of topic partitions based on how many consumers can potentially use them
632 sortedPartitions = sortPartitionsByPotentialConsumerAssignments(partition2AllPotentialConsumers)
633 }
634 return sortedPartitions
635}
636
637func sortMemberIDsByPartitionAssignments(assignments map[string][]topicPartitionAssignment) []string {
638 // sort the members by the number of partition assignments in ascending order
639 sortedMemberIDs := make([]string, 0, len(assignments))
640 for memberID := range assignments {
641 sortedMemberIDs = append(sortedMemberIDs, memberID)
642 }
643 sort.SliceStable(sortedMemberIDs, func(i, j int) bool {
644 ret := len(assignments[sortedMemberIDs[i]]) - len(assignments[sortedMemberIDs[j]])
645 if ret == 0 {
646 return sortedMemberIDs[i] < sortedMemberIDs[j]
647 }
648 return len(assignments[sortedMemberIDs[i]]) < len(assignments[sortedMemberIDs[j]])
649 })
650 return sortedMemberIDs
651}
652
653func sortPartitionsByPotentialConsumerAssignments(partition2AllPotentialConsumers map[topicPartitionAssignment][]string) []topicPartitionAssignment {
654 // sort the members by the number of partition assignments in descending order
655 sortedPartionIDs := make([]topicPartitionAssignment, len(partition2AllPotentialConsumers))
656 i := 0
657 for partition := range partition2AllPotentialConsumers {
658 sortedPartionIDs[i] = partition
659 i++
660 }
661 sort.Slice(sortedPartionIDs, func(i, j int) bool {
662 if len(partition2AllPotentialConsumers[sortedPartionIDs[i]]) == len(partition2AllPotentialConsumers[sortedPartionIDs[j]]) {
663 ret := strings.Compare(sortedPartionIDs[i].Topic, sortedPartionIDs[j].Topic)
664 if ret == 0 {
665 return sortedPartionIDs[i].Partition < sortedPartionIDs[j].Partition
666 }
667 return ret < 0
668 }
669 return len(partition2AllPotentialConsumers[sortedPartionIDs[i]]) < len(partition2AllPotentialConsumers[sortedPartionIDs[j]])
670 })
671 return sortedPartionIDs
672}
673
674func deepCopyPartitions(src []topicPartitionAssignment) []topicPartitionAssignment {
675 dst := make([]topicPartitionAssignment, len(src))
676 for i, partition := range src {
677 dst[i] = partition
678 }
679 return dst
680}
681
682func deepCopyAssignment(assignment map[string][]topicPartitionAssignment) map[string][]topicPartitionAssignment {
683 copy := make(map[string][]topicPartitionAssignment, len(assignment))
684 for memberID, subscriptions := range assignment {
685 copy[memberID] = append(subscriptions[:0:0], subscriptions...)
686 }
687 return copy
688}
689
690func areSubscriptionsIdentical(partition2AllPotentialConsumers map[topicPartitionAssignment][]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) bool {
691 curMembers := make(map[string]int)
692 for _, cur := range partition2AllPotentialConsumers {
693 if len(curMembers) == 0 {
694 for _, curMembersElem := range cur {
695 curMembers[curMembersElem]++
696 }
697 continue
698 }
699
700 if len(curMembers) != len(cur) {
701 return false
702 }
703
704 yMap := make(map[string]int)
705 for _, yElem := range cur {
706 yMap[yElem]++
707 }
708
709 for curMembersMapKey, curMembersMapVal := range curMembers {
710 if yMap[curMembersMapKey] != curMembersMapVal {
711 return false
712 }
713 }
714 }
715
716 curPartitions := make(map[topicPartitionAssignment]int)
717 for _, cur := range consumer2AllPotentialPartitions {
718 if len(curPartitions) == 0 {
719 for _, curPartitionElem := range cur {
720 curPartitions[curPartitionElem]++
721 }
722 continue
723 }
724
725 if len(curPartitions) != len(cur) {
726 return false
727 }
728
729 yMap := make(map[topicPartitionAssignment]int)
730 for _, yElem := range cur {
731 yMap[yElem]++
732 }
733
734 for curMembersMapKey, curMembersMapVal := range curPartitions {
735 if yMap[curMembersMapKey] != curMembersMapVal {
736 return false
737 }
738 }
739 }
740 return true
741}
742
743// We need to process subscriptions' user data with each consumer's reported generation in mind
744// higher generations overwrite lower generations in case of a conflict
745// note that a conflict could exist only if user data is for different generations
746func prepopulateCurrentAssignments(members map[string]ConsumerGroupMemberMetadata) (map[string][]topicPartitionAssignment, map[topicPartitionAssignment]consumerGenerationPair, error) {
747 currentAssignment := make(map[string][]topicPartitionAssignment)
748 prevAssignment := make(map[topicPartitionAssignment]consumerGenerationPair)
749
750 // for each partition we create a sorted map of its consumers by generation
751 sortedPartitionConsumersByGeneration := make(map[topicPartitionAssignment]map[int]string)
752 for memberID, meta := range members {
753 consumerUserData, err := deserializeTopicPartitionAssignment(meta.UserData)
754 if err != nil {
755 return nil, nil, err
756 }
757 for _, partition := range consumerUserData.partitions() {
758 if consumers, exists := sortedPartitionConsumersByGeneration[partition]; exists {
759 if consumerUserData.hasGeneration() {
760 if _, generationExists := consumers[consumerUserData.generation()]; generationExists {
761 // same partition is assigned to two consumers during the same rebalance.
762 // log a warning and skip this record
763 Logger.Printf("Topic %s Partition %d is assigned to multiple consumers following sticky assignment generation %d", partition.Topic, partition.Partition, consumerUserData.generation())
764 continue
765 } else {
766 consumers[consumerUserData.generation()] = memberID
767 }
768 } else {
769 consumers[defaultGeneration] = memberID
770 }
771 } else {
772 generation := defaultGeneration
773 if consumerUserData.hasGeneration() {
774 generation = consumerUserData.generation()
775 }
776 sortedPartitionConsumersByGeneration[partition] = map[int]string{generation: memberID}
777 }
778 }
779 }
780
781 // prevAssignment holds the prior ConsumerGenerationPair (before current) of each partition
782 // current and previous consumers are the last two consumers of each partition in the above sorted map
783 for partition, consumers := range sortedPartitionConsumersByGeneration {
784 // sort consumers by generation in decreasing order
785 var generations []int
786 for generation := range consumers {
787 generations = append(generations, generation)
788 }
789 sort.Sort(sort.Reverse(sort.IntSlice(generations)))
790
791 consumer := consumers[generations[0]]
792 if _, exists := currentAssignment[consumer]; !exists {
793 currentAssignment[consumer] = []topicPartitionAssignment{partition}
794 } else {
795 currentAssignment[consumer] = append(currentAssignment[consumer], partition)
796 }
797
798 // check for previous assignment, if any
799 if len(generations) > 1 {
800 prevAssignment[partition] = consumerGenerationPair{
801 MemberID: consumers[generations[1]],
802 Generation: generations[1],
803 }
804 }
805 }
806 return currentAssignment, prevAssignment, nil
807}
808
809type consumerGenerationPair struct {
810 MemberID string
811 Generation int
812}
813
814// consumerPair represents a pair of Kafka consumer ids involved in a partition reassignment.
815type consumerPair struct {
816 SrcMemberID string
817 DstMemberID string
818}
819
820// partitionMovements maintains some data structures to simplify lookup of partition movements among consumers.
821type partitionMovements struct {
822 PartitionMovementsByTopic map[string]map[consumerPair]map[topicPartitionAssignment]bool
823 Movements map[topicPartitionAssignment]consumerPair
824}
825
826func (p *partitionMovements) removeMovementRecordOfPartition(partition topicPartitionAssignment) consumerPair {
827 pair := p.Movements[partition]
828 delete(p.Movements, partition)
829
830 partitionMovementsForThisTopic := p.PartitionMovementsByTopic[partition.Topic]
831 delete(partitionMovementsForThisTopic[pair], partition)
832 if len(partitionMovementsForThisTopic[pair]) == 0 {
833 delete(partitionMovementsForThisTopic, pair)
834 }
835 if len(p.PartitionMovementsByTopic[partition.Topic]) == 0 {
836 delete(p.PartitionMovementsByTopic, partition.Topic)
837 }
838 return pair
839}
840
841func (p *partitionMovements) addPartitionMovementRecord(partition topicPartitionAssignment, pair consumerPair) {
842 p.Movements[partition] = pair
843 if _, exists := p.PartitionMovementsByTopic[partition.Topic]; !exists {
844 p.PartitionMovementsByTopic[partition.Topic] = make(map[consumerPair]map[topicPartitionAssignment]bool)
845 }
846 partitionMovementsForThisTopic := p.PartitionMovementsByTopic[partition.Topic]
847 if _, exists := partitionMovementsForThisTopic[pair]; !exists {
848 partitionMovementsForThisTopic[pair] = make(map[topicPartitionAssignment]bool)
849 }
850 partitionMovementsForThisTopic[pair][partition] = true
851}
852
853func (p *partitionMovements) movePartition(partition topicPartitionAssignment, oldConsumer, newConsumer string) {
854 pair := consumerPair{
855 SrcMemberID: oldConsumer,
856 DstMemberID: newConsumer,
857 }
858 if _, exists := p.Movements[partition]; exists {
859 // this partition has previously moved
860 existingPair := p.removeMovementRecordOfPartition(partition)
861 if existingPair.DstMemberID != oldConsumer {
862 Logger.Printf("Existing pair DstMemberID %s was not equal to the oldConsumer ID %s", existingPair.DstMemberID, oldConsumer)
863 }
864 if existingPair.SrcMemberID != newConsumer {
865 // the partition is not moving back to its previous consumer
866 p.addPartitionMovementRecord(partition, consumerPair{
867 SrcMemberID: existingPair.SrcMemberID,
868 DstMemberID: newConsumer,
869 })
870 }
871 } else {
872 p.addPartitionMovementRecord(partition, pair)
873 }
874}
875
876func (p *partitionMovements) getTheActualPartitionToBeMoved(partition topicPartitionAssignment, oldConsumer, newConsumer string) topicPartitionAssignment {
877 if _, exists := p.PartitionMovementsByTopic[partition.Topic]; !exists {
878 return partition
879 }
880 if _, exists := p.Movements[partition]; exists {
881 // this partition has previously moved
882 if oldConsumer != p.Movements[partition].DstMemberID {
883 Logger.Printf("Partition movement DstMemberID %s was not equal to the oldConsumer ID %s", p.Movements[partition].DstMemberID, oldConsumer)
884 }
885 oldConsumer = p.Movements[partition].SrcMemberID
886 }
887
888 partitionMovementsForThisTopic := p.PartitionMovementsByTopic[partition.Topic]
889 reversePair := consumerPair{
890 SrcMemberID: newConsumer,
891 DstMemberID: oldConsumer,
892 }
893 if _, exists := partitionMovementsForThisTopic[reversePair]; !exists {
894 return partition
895 }
896 var reversePairPartition topicPartitionAssignment
897 for otherPartition := range partitionMovementsForThisTopic[reversePair] {
898 reversePairPartition = otherPartition
899 }
900 return reversePairPartition
901}
902
903func (p *partitionMovements) isLinked(src, dst string, pairs []consumerPair, currentPath []string) ([]string, bool) {
904 if src == dst {
905 return currentPath, false
906 }
907 if len(pairs) == 0 {
908 return currentPath, false
909 }
910 for _, pair := range pairs {
911 if src == pair.SrcMemberID && dst == pair.DstMemberID {
912 currentPath = append(currentPath, src, dst)
913 return currentPath, true
914 }
915 }
916
917 for _, pair := range pairs {
918 if pair.SrcMemberID == src {
919 // create a deep copy of the pairs, excluding the current pair
920 reducedSet := make([]consumerPair, len(pairs)-1)
921 i := 0
922 for _, p := range pairs {
923 if p != pair {
924 reducedSet[i] = pair
925 i++
926 }
927 }
928
929 currentPath = append(currentPath, pair.SrcMemberID)
930 return p.isLinked(pair.DstMemberID, dst, reducedSet, currentPath)
931 }
932 }
933 return currentPath, false
934}
935
936func (p *partitionMovements) in(cycle []string, cycles [][]string) bool {
937 superCycle := make([]string, len(cycle)-1)
938 for i := 0; i < len(cycle)-1; i++ {
939 superCycle[i] = cycle[i]
940 }
941 for _, c := range cycle {
942 superCycle = append(superCycle, c)
943 }
944 for _, foundCycle := range cycles {
945 if len(foundCycle) == len(cycle) && indexOfSubList(superCycle, foundCycle) != -1 {
946 return true
947 }
948 }
949 return false
950}
951
952func (p *partitionMovements) hasCycles(pairs []consumerPair) bool {
953 cycles := make([][]string, 0)
954 for _, pair := range pairs {
955 // create a deep copy of the pairs, excluding the current pair
956 reducedPairs := make([]consumerPair, len(pairs)-1)
957 i := 0
958 for _, p := range pairs {
959 if p != pair {
960 reducedPairs[i] = pair
961 i++
962 }
963 }
964 if path, linked := p.isLinked(pair.DstMemberID, pair.SrcMemberID, reducedPairs, []string{pair.SrcMemberID}); linked {
965 if !p.in(path, cycles) {
966 cycles = append(cycles, path)
967 Logger.Printf("A cycle of length %d was found: %v", len(path)-1, path)
968 }
969 }
970 }
971
972 // for now we want to make sure there is no partition movements of the same topic between a pair of consumers.
973 // the odds of finding a cycle among more than two consumers seem to be very low (according to various randomized
974 // tests with the given sticky algorithm) that it should not worth the added complexity of handling those cases.
975 for _, cycle := range cycles {
976 if len(cycle) == 3 {
977 return true
978 }
979 }
980 return false
981}
982
983func (p *partitionMovements) isSticky() bool {
984 for topic, movements := range p.PartitionMovementsByTopic {
985 movementPairs := make([]consumerPair, len(movements))
986 i := 0
987 for pair := range movements {
988 movementPairs[i] = pair
989 i++
990 }
991 if p.hasCycles(movementPairs) {
992 Logger.Printf("Stickiness is violated for topic %s", topic)
993 Logger.Printf("Partition movements for this topic occurred among the following consumer pairs: %v", movements)
994 return false
995 }
996 }
997 return true
998}
999
1000func indexOfSubList(source []string, target []string) int {
1001 targetSize := len(target)
1002 maxCandidate := len(source) - targetSize
1003nextCand:
1004 for candidate := 0; candidate <= maxCandidate; candidate++ {
1005 j := candidate
1006 for i := 0; i < targetSize; i++ {
1007 if target[i] != source[j] {
1008 // Element mismatch, try next cand
1009 continue nextCand
1010 }
1011 j++
1012 }
1013 // All elements of candidate matched target
1014 return candidate
1015 }
1016 return -1
1017}
1018
1019type consumerGroupMember struct {
1020 id string
1021 assignments []topicPartitionAssignment
1022}
1023
1024// assignmentPriorityQueue is a priority-queue of consumer group members that is sorted
1025// in descending order (most assignments to least assignments).
1026type assignmentPriorityQueue []*consumerGroupMember
1027
1028func (pq assignmentPriorityQueue) Len() int { return len(pq) }
1029
1030func (pq assignmentPriorityQueue) Less(i, j int) bool {
1031 // order asssignment priority queue in descending order using assignment-count/member-id
1032 if len(pq[i].assignments) == len(pq[j].assignments) {
1033 return strings.Compare(pq[i].id, pq[j].id) > 0
1034 }
1035 return len(pq[i].assignments) > len(pq[j].assignments)
1036}
1037
1038func (pq assignmentPriorityQueue) Swap(i, j int) {
1039 pq[i], pq[j] = pq[j], pq[i]
1040}
1041
1042func (pq *assignmentPriorityQueue) Push(x interface{}) {
1043 member := x.(*consumerGroupMember)
1044 *pq = append(*pq, member)
1045}
1046
1047func (pq *assignmentPriorityQueue) Pop() interface{} {
1048 old := *pq
1049 n := len(old)
1050 member := old[n-1]
1051 *pq = old[0 : n-1]
1052 return member
1053}