blob: 56da276d2a1b009b9ec54799de44370fba634ac5 [file] [log] [blame]
Pragya Arya324337e2020-02-20 14:35:08 +05301package sarama
2
3import (
4 "container/heap"
5 "math"
6 "sort"
7 "strings"
8)
9
10const (
11 // RangeBalanceStrategyName identifies strategies that use the range partition assignment strategy
12 RangeBalanceStrategyName = "range"
13
14 // RoundRobinBalanceStrategyName identifies strategies that use the round-robin partition assignment strategy
15 RoundRobinBalanceStrategyName = "roundrobin"
16
17 // StickyBalanceStrategyName identifies strategies that use the sticky-partition assignment strategy
18 StickyBalanceStrategyName = "sticky"
19
20 defaultGeneration = -1
21)
22
23// BalanceStrategyPlan is the results of any BalanceStrategy.Plan attempt.
24// It contains an allocation of topic/partitions by memberID in the form of
25// a `memberID -> topic -> partitions` map.
26type BalanceStrategyPlan map[string]map[string][]int32
27
28// Add assigns a topic with a number partitions to a member.
29func (p BalanceStrategyPlan) Add(memberID, topic string, partitions ...int32) {
30 if len(partitions) == 0 {
31 return
32 }
33 if _, ok := p[memberID]; !ok {
34 p[memberID] = make(map[string][]int32, 1)
35 }
36 p[memberID][topic] = append(p[memberID][topic], partitions...)
37}
38
39// --------------------------------------------------------------------
40
41// BalanceStrategy is used to balance topics and partitions
42// across members of a consumer group
43type BalanceStrategy interface {
44 // Name uniquely identifies the strategy.
45 Name() string
46
47 // Plan accepts a map of `memberID -> metadata` and a map of `topic -> partitions`
48 // and returns a distribution plan.
49 Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error)
50
51 // AssignmentData returns the serialized assignment data for the specified
52 // memberID
53 AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error)
54}
55
56// --------------------------------------------------------------------
57
58// BalanceStrategyRange is the default and assigns partitions as ranges to consumer group members.
59// Example with one topic T with six partitions (0..5) and two members (M1, M2):
60// M1: {T: [0, 1, 2]}
61// M2: {T: [3, 4, 5]}
62var BalanceStrategyRange = &balanceStrategy{
63 name: RangeBalanceStrategyName,
64 coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) {
65 step := float64(len(partitions)) / float64(len(memberIDs))
66
67 for i, memberID := range memberIDs {
68 pos := float64(i)
69 min := int(math.Floor(pos*step + 0.5))
70 max := int(math.Floor((pos+1)*step + 0.5))
71 plan.Add(memberID, topic, partitions[min:max]...)
72 }
73 },
74}
75
76// BalanceStrategyRoundRobin assigns partitions to members in alternating order.
77// Example with topic T with six partitions (0..5) and two members (M1, M2):
78// M1: {T: [0, 2, 4]}
79// M2: {T: [1, 3, 5]}
80var BalanceStrategyRoundRobin = &balanceStrategy{
81 name: RoundRobinBalanceStrategyName,
82 coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) {
83 for i, part := range partitions {
84 memberID := memberIDs[i%len(memberIDs)]
85 plan.Add(memberID, topic, part)
86 }
87 },
88}
89
90// BalanceStrategySticky assigns partitions to members with an attempt to preserve earlier assignments
91// while maintain a balanced partition distribution.
92// Example with topic T with six partitions (0..5) and two members (M1, M2):
93// M1: {T: [0, 2, 4]}
94// M2: {T: [1, 3, 5]}
95//
96// On reassignment with an additional consumer, you might get an assignment plan like:
97// M1: {T: [0, 2]}
98// M2: {T: [1, 3]}
99// M3: {T: [4, 5]}
100//
101var BalanceStrategySticky = &stickyBalanceStrategy{}
102
103// --------------------------------------------------------------------
104
105type balanceStrategy struct {
106 name string
107 coreFn func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32)
108}
109
110// Name implements BalanceStrategy.
111func (s *balanceStrategy) Name() string { return s.name }
112
113// Plan implements BalanceStrategy.
114func (s *balanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
115 // Build members by topic map
116 mbt := make(map[string][]string)
117 for memberID, meta := range members {
118 for _, topic := range meta.Topics {
119 mbt[topic] = append(mbt[topic], memberID)
120 }
121 }
122
123 // Sort members for each topic
124 for topic, memberIDs := range mbt {
125 sort.Sort(&balanceStrategySortable{
126 topic: topic,
127 memberIDs: memberIDs,
128 })
129 }
130
131 // Assemble plan
132 plan := make(BalanceStrategyPlan, len(members))
133 for topic, memberIDs := range mbt {
134 s.coreFn(plan, memberIDs, topic, topics[topic])
135 }
136 return plan, nil
137}
138
139// AssignmentData simple strategies do not require any shared assignment data
140func (s *balanceStrategy) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
141 return nil, nil
142}
143
144type balanceStrategySortable struct {
145 topic string
146 memberIDs []string
147}
148
149func (p balanceStrategySortable) Len() int { return len(p.memberIDs) }
150func (p balanceStrategySortable) Swap(i, j int) {
151 p.memberIDs[i], p.memberIDs[j] = p.memberIDs[j], p.memberIDs[i]
152}
153func (p balanceStrategySortable) Less(i, j int) bool {
154 return balanceStrategyHashValue(p.topic, p.memberIDs[i]) < balanceStrategyHashValue(p.topic, p.memberIDs[j])
155}
156
157func balanceStrategyHashValue(vv ...string) uint32 {
158 h := uint32(2166136261)
159 for _, s := range vv {
160 for _, c := range s {
161 h ^= uint32(c)
162 h *= 16777619
163 }
164 }
165 return h
166}
167
168type stickyBalanceStrategy struct {
169 movements partitionMovements
170}
171
172// Name implements BalanceStrategy.
173func (s *stickyBalanceStrategy) Name() string { return StickyBalanceStrategyName }
174
175// Plan implements BalanceStrategy.
176func (s *stickyBalanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
177 // track partition movements during generation of the partition assignment plan
178 s.movements = partitionMovements{
179 Movements: make(map[topicPartitionAssignment]consumerPair),
180 PartitionMovementsByTopic: make(map[string]map[consumerPair]map[topicPartitionAssignment]bool),
181 }
182
183 // prepopulate the current assignment state from userdata on the consumer group members
184 currentAssignment, prevAssignment, err := prepopulateCurrentAssignments(members)
185 if err != nil {
186 return nil, err
187 }
188
189 // determine if we're dealing with a completely fresh assignment, or if there's existing assignment state
190 isFreshAssignment := false
191 if len(currentAssignment) == 0 {
192 isFreshAssignment = true
193 }
194
195 // create a mapping of all current topic partitions and the consumers that can be assigned to them
196 partition2AllPotentialConsumers := make(map[topicPartitionAssignment][]string)
197 for topic, partitions := range topics {
198 for _, partition := range partitions {
199 partition2AllPotentialConsumers[topicPartitionAssignment{Topic: topic, Partition: partition}] = []string{}
200 }
201 }
202
203 // create a mapping of all consumers to all potential topic partitions that can be assigned to them
204 // also, populate the mapping of partitions to potential consumers
205 consumer2AllPotentialPartitions := make(map[string][]topicPartitionAssignment, len(members))
206 for memberID, meta := range members {
207 consumer2AllPotentialPartitions[memberID] = make([]topicPartitionAssignment, 0)
208 for _, topicSubscription := range meta.Topics {
209 // only evaluate topic subscriptions that are present in the supplied topics map
210 if _, found := topics[topicSubscription]; found {
211 for _, partition := range topics[topicSubscription] {
212 topicPartition := topicPartitionAssignment{Topic: topicSubscription, Partition: partition}
213 consumer2AllPotentialPartitions[memberID] = append(consumer2AllPotentialPartitions[memberID], topicPartition)
214 partition2AllPotentialConsumers[topicPartition] = append(partition2AllPotentialConsumers[topicPartition], memberID)
215 }
216 }
217 }
218
219 // add this consumer to currentAssignment (with an empty topic partition assignment) if it does not already exist
220 if _, exists := currentAssignment[memberID]; !exists {
221 currentAssignment[memberID] = make([]topicPartitionAssignment, 0)
222 }
223 }
224
225 // create a mapping of each partition to its current consumer, where possible
226 currentPartitionConsumers := make(map[topicPartitionAssignment]string, len(currentAssignment))
227 unvisitedPartitions := make(map[topicPartitionAssignment]bool, len(partition2AllPotentialConsumers))
228 for partition := range partition2AllPotentialConsumers {
229 unvisitedPartitions[partition] = true
230 }
231 var unassignedPartitions []topicPartitionAssignment
232 for memberID, partitions := range currentAssignment {
233 var keepPartitions []topicPartitionAssignment
234 for _, partition := range partitions {
235 // If this partition no longer exists at all, likely due to the
236 // topic being deleted, we remove the partition from the member.
237 if _, exists := partition2AllPotentialConsumers[partition]; !exists {
238 continue
239 }
240 delete(unvisitedPartitions, partition)
241 currentPartitionConsumers[partition] = memberID
242
243 if !strsContains(members[memberID].Topics, partition.Topic) {
244 unassignedPartitions = append(unassignedPartitions, partition)
245 continue
246 }
247 keepPartitions = append(keepPartitions, partition)
248 }
249 currentAssignment[memberID] = keepPartitions
250 }
251 for unvisited := range unvisitedPartitions {
252 unassignedPartitions = append(unassignedPartitions, unvisited)
253 }
254
255 // sort the topic partitions in order of priority for reassignment
256 sortedPartitions := sortPartitions(currentAssignment, prevAssignment, isFreshAssignment, partition2AllPotentialConsumers, consumer2AllPotentialPartitions)
257
258 // at this point we have preserved all valid topic partition to consumer assignments and removed
259 // all invalid topic partitions and invalid consumers. Now we need to assign unassignedPartitions
260 // to consumers so that the topic partition assignments are as balanced as possible.
261
262 // an ascending sorted set of consumers based on how many topic partitions are already assigned to them
263 sortedCurrentSubscriptions := sortMemberIDsByPartitionAssignments(currentAssignment)
264 s.balance(currentAssignment, prevAssignment, sortedPartitions, unassignedPartitions, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumers)
265
266 // Assemble plan
267 plan := make(BalanceStrategyPlan, len(currentAssignment))
268 for memberID, assignments := range currentAssignment {
269 if len(assignments) == 0 {
270 plan[memberID] = make(map[string][]int32, 0)
271 } else {
272 for _, assignment := range assignments {
273 plan.Add(memberID, assignment.Topic, assignment.Partition)
274 }
275 }
276 }
277 return plan, nil
278}
279
280// AssignmentData serializes the set of topics currently assigned to the
281// specified member as part of the supplied balance plan
282func (s *stickyBalanceStrategy) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
283 return encode(&StickyAssignorUserDataV1{
284 Topics: topics,
285 Generation: generationID,
286 }, nil)
287}
288
289func strsContains(s []string, value string) bool {
290 for _, entry := range s {
291 if entry == value {
292 return true
293 }
294 }
295 return false
296}
297
298// Balance assignments across consumers for maximum fairness and stickiness.
299func (s *stickyBalanceStrategy) balance(currentAssignment map[string][]topicPartitionAssignment, prevAssignment map[topicPartitionAssignment]consumerGenerationPair, sortedPartitions []topicPartitionAssignment, unassignedPartitions []topicPartitionAssignment, sortedCurrentSubscriptions []string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, currentPartitionConsumer map[topicPartitionAssignment]string) {
300 initializing := false
301 if len(sortedCurrentSubscriptions) == 0 || len(currentAssignment[sortedCurrentSubscriptions[0]]) == 0 {
302 initializing = true
303 }
304
305 // assign all unassigned partitions
306 for _, partition := range unassignedPartitions {
307 // skip if there is no potential consumer for the partition
308 if len(partition2AllPotentialConsumers[partition]) == 0 {
309 continue
310 }
311 sortedCurrentSubscriptions = assignPartition(partition, sortedCurrentSubscriptions, currentAssignment, consumer2AllPotentialPartitions, currentPartitionConsumer)
312 }
313
314 // narrow down the reassignment scope to only those partitions that can actually be reassigned
315 for partition := range partition2AllPotentialConsumers {
316 if !canTopicPartitionParticipateInReassignment(partition, partition2AllPotentialConsumers) {
317 sortedPartitions = removeTopicPartitionFromMemberAssignments(sortedPartitions, partition)
318 }
319 }
320
321 // narrow down the reassignment scope to only those consumers that are subject to reassignment
322 fixedAssignments := make(map[string][]topicPartitionAssignment)
323 for memberID := range consumer2AllPotentialPartitions {
324 if !canConsumerParticipateInReassignment(memberID, currentAssignment, consumer2AllPotentialPartitions, partition2AllPotentialConsumers) {
325 fixedAssignments[memberID] = currentAssignment[memberID]
326 delete(currentAssignment, memberID)
327 sortedCurrentSubscriptions = sortMemberIDsByPartitionAssignments(currentAssignment)
328 }
329 }
330
331 // create a deep copy of the current assignment so we can revert to it if we do not get a more balanced assignment later
332 preBalanceAssignment := deepCopyAssignment(currentAssignment)
333 preBalancePartitionConsumers := make(map[topicPartitionAssignment]string, len(currentPartitionConsumer))
334 for k, v := range currentPartitionConsumer {
335 preBalancePartitionConsumers[k] = v
336 }
337
338 reassignmentPerformed := s.performReassignments(sortedPartitions, currentAssignment, prevAssignment, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer)
339
340 // if we are not preserving existing assignments and we have made changes to the current assignment
341 // make sure we are getting a more balanced assignment; otherwise, revert to previous assignment
342 if !initializing && reassignmentPerformed && getBalanceScore(currentAssignment) >= getBalanceScore(preBalanceAssignment) {
343 currentAssignment = deepCopyAssignment(preBalanceAssignment)
344 currentPartitionConsumer = make(map[topicPartitionAssignment]string, len(preBalancePartitionConsumers))
345 for k, v := range preBalancePartitionConsumers {
346 currentPartitionConsumer[k] = v
347 }
348 }
349
350 // add the fixed assignments (those that could not change) back
351 for consumer, assignments := range fixedAssignments {
352 currentAssignment[consumer] = assignments
353 }
354}
355
356// Calculate the balance score of the given assignment, as the sum of assigned partitions size difference of all consumer pairs.
357// A perfectly balanced assignment (with all consumers getting the same number of partitions) has a balance score of 0.
358// Lower balance score indicates a more balanced assignment.
359func getBalanceScore(assignment map[string][]topicPartitionAssignment) int {
360 consumer2AssignmentSize := make(map[string]int, len(assignment))
361 for memberID, partitions := range assignment {
362 consumer2AssignmentSize[memberID] = len(partitions)
363 }
364
365 var score float64
366 for memberID, consumerAssignmentSize := range consumer2AssignmentSize {
367 delete(consumer2AssignmentSize, memberID)
368 for _, otherConsumerAssignmentSize := range consumer2AssignmentSize {
369 score += math.Abs(float64(consumerAssignmentSize - otherConsumerAssignmentSize))
370 }
371 }
372 return int(score)
373}
374
375// Determine whether the current assignment plan is balanced.
376func isBalanced(currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, allSubscriptions map[string][]topicPartitionAssignment) bool {
377 sortedCurrentSubscriptions = sortMemberIDsByPartitionAssignments(currentAssignment)
378 min := len(currentAssignment[sortedCurrentSubscriptions[0]])
379 max := len(currentAssignment[sortedCurrentSubscriptions[len(sortedCurrentSubscriptions)-1]])
380 if min >= max-1 {
381 // if minimum and maximum numbers of partitions assigned to consumers differ by at most one return true
382 return true
383 }
384
385 // create a mapping from partitions to the consumer assigned to them
386 allPartitions := make(map[topicPartitionAssignment]string)
387 for memberID, partitions := range currentAssignment {
388 for _, partition := range partitions {
389 if _, exists := allPartitions[partition]; exists {
390 Logger.Printf("Topic %s Partition %d is assigned more than one consumer", partition.Topic, partition.Partition)
391 }
392 allPartitions[partition] = memberID
393 }
394 }
395
396 // for each consumer that does not have all the topic partitions it can get make sure none of the topic partitions it
397 // could but did not get cannot be moved to it (because that would break the balance)
398 for _, memberID := range sortedCurrentSubscriptions {
399 consumerPartitions := currentAssignment[memberID]
400 consumerPartitionCount := len(consumerPartitions)
401
402 // skip if this consumer already has all the topic partitions it can get
403 if consumerPartitionCount == len(allSubscriptions[memberID]) {
404 continue
405 }
406
407 // otherwise make sure it cannot get any more
408 potentialTopicPartitions := allSubscriptions[memberID]
409 for _, partition := range potentialTopicPartitions {
410 if !memberAssignmentsIncludeTopicPartition(currentAssignment[memberID], partition) {
411 otherConsumer := allPartitions[partition]
412 otherConsumerPartitionCount := len(currentAssignment[otherConsumer])
413 if consumerPartitionCount < otherConsumerPartitionCount {
414 return false
415 }
416 }
417 }
418 }
419 return true
420}
421
422// Reassign all topic partitions that need reassignment until balanced.
423func (s *stickyBalanceStrategy) performReassignments(reassignablePartitions []topicPartitionAssignment, currentAssignment map[string][]topicPartitionAssignment, prevAssignment map[topicPartitionAssignment]consumerGenerationPair, sortedCurrentSubscriptions []string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, currentPartitionConsumer map[topicPartitionAssignment]string) bool {
424 reassignmentPerformed := false
425 modified := false
426
427 // repeat reassignment until no partition can be moved to improve the balance
428 for {
429 modified = false
430 // reassign all reassignable partitions (starting from the partition with least potential consumers and if needed)
431 // until the full list is processed or a balance is achieved
432 for _, partition := range reassignablePartitions {
433 if isBalanced(currentAssignment, sortedCurrentSubscriptions, consumer2AllPotentialPartitions) {
434 break
435 }
436
437 // the partition must have at least two consumers
438 if len(partition2AllPotentialConsumers[partition]) <= 1 {
439 Logger.Printf("Expected more than one potential consumer for partition %s topic %d", partition.Topic, partition.Partition)
440 }
441
442 // the partition must have a consumer
443 consumer := currentPartitionConsumer[partition]
444 if consumer == "" {
445 Logger.Printf("Expected topic %s partition %d to be assigned to a consumer", partition.Topic, partition.Partition)
446 }
447
448 if _, exists := prevAssignment[partition]; exists {
449 if len(currentAssignment[consumer]) > (len(currentAssignment[prevAssignment[partition].MemberID]) + 1) {
450 sortedCurrentSubscriptions = s.reassignPartition(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, prevAssignment[partition].MemberID)
451 reassignmentPerformed = true
452 modified = true
453 continue
454 }
455 }
456
457 // check if a better-suited consumer exists for the partition; if so, reassign it
458 for _, otherConsumer := range partition2AllPotentialConsumers[partition] {
459 if len(currentAssignment[consumer]) > (len(currentAssignment[otherConsumer]) + 1) {
460 sortedCurrentSubscriptions = s.reassignPartitionToNewConsumer(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, consumer2AllPotentialPartitions)
461 reassignmentPerformed = true
462 modified = true
463 break
464 }
465 }
466 }
467 if !modified {
468 return reassignmentPerformed
469 }
470 }
471}
472
473// Identify a new consumer for a topic partition and reassign it.
474func (s *stickyBalanceStrategy) reassignPartitionToNewConsumer(partition topicPartitionAssignment, currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, currentPartitionConsumer map[topicPartitionAssignment]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) []string {
475 for _, anotherConsumer := range sortedCurrentSubscriptions {
476 if memberAssignmentsIncludeTopicPartition(consumer2AllPotentialPartitions[anotherConsumer], partition) {
477 return s.reassignPartition(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, anotherConsumer)
478 }
479 }
480 return sortedCurrentSubscriptions
481}
482
483// Reassign a specific partition to a new consumer
484func (s *stickyBalanceStrategy) reassignPartition(partition topicPartitionAssignment, currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, currentPartitionConsumer map[topicPartitionAssignment]string, newConsumer string) []string {
485 consumer := currentPartitionConsumer[partition]
486 // find the correct partition movement considering the stickiness requirement
487 partitionToBeMoved := s.movements.getTheActualPartitionToBeMoved(partition, consumer, newConsumer)
488 return s.processPartitionMovement(partitionToBeMoved, newConsumer, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer)
489}
490
491// Track the movement of a topic partition after assignment
492func (s *stickyBalanceStrategy) processPartitionMovement(partition topicPartitionAssignment, newConsumer string, currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, currentPartitionConsumer map[topicPartitionAssignment]string) []string {
493 oldConsumer := currentPartitionConsumer[partition]
494 s.movements.movePartition(partition, oldConsumer, newConsumer)
495
496 currentAssignment[oldConsumer] = removeTopicPartitionFromMemberAssignments(currentAssignment[oldConsumer], partition)
497 currentAssignment[newConsumer] = append(currentAssignment[newConsumer], partition)
498 currentPartitionConsumer[partition] = newConsumer
499 return sortMemberIDsByPartitionAssignments(currentAssignment)
500}
501
502// Determine whether a specific consumer should be considered for topic partition assignment.
503func canConsumerParticipateInReassignment(memberID string, currentAssignment map[string][]topicPartitionAssignment, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string) bool {
504 currentPartitions := currentAssignment[memberID]
505 currentAssignmentSize := len(currentPartitions)
506 maxAssignmentSize := len(consumer2AllPotentialPartitions[memberID])
507 if currentAssignmentSize > maxAssignmentSize {
508 Logger.Printf("The consumer %s is assigned more partitions than the maximum possible", memberID)
509 }
510 if currentAssignmentSize < maxAssignmentSize {
511 // if a consumer is not assigned all its potential partitions it is subject to reassignment
512 return true
513 }
514 for _, partition := range currentPartitions {
515 if canTopicPartitionParticipateInReassignment(partition, partition2AllPotentialConsumers) {
516 return true
517 }
518 }
519 return false
520}
521
522// Only consider reassigning those topic partitions that have two or more potential consumers.
523func canTopicPartitionParticipateInReassignment(partition topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string) bool {
524 return len(partition2AllPotentialConsumers[partition]) >= 2
525}
526
527// The assignment should improve the overall balance of the partition assignments to consumers.
528func assignPartition(partition topicPartitionAssignment, sortedCurrentSubscriptions []string, currentAssignment map[string][]topicPartitionAssignment, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, currentPartitionConsumer map[topicPartitionAssignment]string) []string {
529 for _, memberID := range sortedCurrentSubscriptions {
530 if memberAssignmentsIncludeTopicPartition(consumer2AllPotentialPartitions[memberID], partition) {
531 currentAssignment[memberID] = append(currentAssignment[memberID], partition)
532 currentPartitionConsumer[partition] = memberID
533 break
534 }
535 }
536 return sortMemberIDsByPartitionAssignments(currentAssignment)
537}
538
539// Deserialize topic partition assignment data to aid with creation of a sticky assignment.
540func deserializeTopicPartitionAssignment(userDataBytes []byte) (StickyAssignorUserData, error) {
541 userDataV1 := &StickyAssignorUserDataV1{}
542 if err := decode(userDataBytes, userDataV1); err != nil {
543 userDataV0 := &StickyAssignorUserDataV0{}
544 if err := decode(userDataBytes, userDataV0); err != nil {
545 return nil, err
546 }
547 return userDataV0, nil
548 }
549 return userDataV1, nil
550}
551
552// filterAssignedPartitions returns a map of consumer group members to their list of previously-assigned topic partitions, limited
553// to those topic partitions currently reported by the Kafka cluster.
554func filterAssignedPartitions(currentAssignment map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string) map[string][]topicPartitionAssignment {
555 assignments := deepCopyAssignment(currentAssignment)
556 for memberID, partitions := range assignments {
557 // perform in-place filtering
558 i := 0
559 for _, partition := range partitions {
560 if _, exists := partition2AllPotentialConsumers[partition]; exists {
561 partitions[i] = partition
562 i++
563 }
564 }
565 assignments[memberID] = partitions[:i]
566 }
567 return assignments
568}
569
570func removeTopicPartitionFromMemberAssignments(assignments []topicPartitionAssignment, topic topicPartitionAssignment) []topicPartitionAssignment {
571 for i, assignment := range assignments {
572 if assignment == topic {
573 return append(assignments[:i], assignments[i+1:]...)
574 }
575 }
576 return assignments
577}
578
579func memberAssignmentsIncludeTopicPartition(assignments []topicPartitionAssignment, topic topicPartitionAssignment) bool {
580 for _, assignment := range assignments {
581 if assignment == topic {
582 return true
583 }
584 }
585 return false
586}
587
588func sortPartitions(currentAssignment map[string][]topicPartitionAssignment, partitionsWithADifferentPreviousAssignment map[topicPartitionAssignment]consumerGenerationPair, isFreshAssignment bool, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) []topicPartitionAssignment {
589 unassignedPartitions := make(map[topicPartitionAssignment]bool, len(partition2AllPotentialConsumers))
590 for partition := range partition2AllPotentialConsumers {
591 unassignedPartitions[partition] = true
592 }
593
594 sortedPartitions := make([]topicPartitionAssignment, 0)
595 if !isFreshAssignment && areSubscriptionsIdentical(partition2AllPotentialConsumers, consumer2AllPotentialPartitions) {
596 // if this is a reassignment and the subscriptions are identical (all consumers can consumer from all topics)
597 // then we just need to simply list partitions in a round robin fashion (from consumers with
598 // most assigned partitions to those with least)
599 assignments := filterAssignedPartitions(currentAssignment, partition2AllPotentialConsumers)
600
601 // use priority-queue to evaluate consumer group members in descending-order based on
602 // the number of topic partition assignments (i.e. consumers with most assignments first)
603 pq := make(assignmentPriorityQueue, len(assignments))
604 i := 0
605 for consumerID, consumerAssignments := range assignments {
606 pq[i] = &consumerGroupMember{
607 id: consumerID,
608 assignments: consumerAssignments,
609 }
610 i++
611 }
612 heap.Init(&pq)
613
614 for {
615 // loop until no consumer-group members remain
616 if pq.Len() == 0 {
617 break
618 }
619 member := pq[0]
620
621 // partitions that were assigned to a different consumer last time
622 var prevPartitionIndex int
623 for i, partition := range member.assignments {
624 if _, exists := partitionsWithADifferentPreviousAssignment[partition]; exists {
625 prevPartitionIndex = i
626 break
627 }
628 }
629
630 if len(member.assignments) > 0 {
631 partition := member.assignments[prevPartitionIndex]
632 sortedPartitions = append(sortedPartitions, partition)
633 delete(unassignedPartitions, partition)
634 if prevPartitionIndex == 0 {
635 member.assignments = member.assignments[1:]
636 } else {
637 member.assignments = append(member.assignments[:prevPartitionIndex], member.assignments[prevPartitionIndex+1:]...)
638 }
639 heap.Fix(&pq, 0)
640 } else {
641 heap.Pop(&pq)
642 }
643 }
644
645 for partition := range unassignedPartitions {
646 sortedPartitions = append(sortedPartitions, partition)
647 }
648 } else {
649 // an ascending sorted set of topic partitions based on how many consumers can potentially use them
650 sortedPartitions = sortPartitionsByPotentialConsumerAssignments(partition2AllPotentialConsumers)
651 }
652 return sortedPartitions
653}
654
655func sortMemberIDsByPartitionAssignments(assignments map[string][]topicPartitionAssignment) []string {
656 // sort the members by the number of partition assignments in ascending order
657 sortedMemberIDs := make([]string, 0, len(assignments))
658 for memberID := range assignments {
659 sortedMemberIDs = append(sortedMemberIDs, memberID)
660 }
661 sort.SliceStable(sortedMemberIDs, func(i, j int) bool {
662 ret := len(assignments[sortedMemberIDs[i]]) - len(assignments[sortedMemberIDs[j]])
663 if ret == 0 {
664 return sortedMemberIDs[i] < sortedMemberIDs[j]
665 }
666 return len(assignments[sortedMemberIDs[i]]) < len(assignments[sortedMemberIDs[j]])
667 })
668 return sortedMemberIDs
669}
670
671func sortPartitionsByPotentialConsumerAssignments(partition2AllPotentialConsumers map[topicPartitionAssignment][]string) []topicPartitionAssignment {
672 // sort the members by the number of partition assignments in descending order
673 sortedPartionIDs := make([]topicPartitionAssignment, len(partition2AllPotentialConsumers))
674 i := 0
675 for partition := range partition2AllPotentialConsumers {
676 sortedPartionIDs[i] = partition
677 i++
678 }
679 sort.Slice(sortedPartionIDs, func(i, j int) bool {
680 if len(partition2AllPotentialConsumers[sortedPartionIDs[i]]) == len(partition2AllPotentialConsumers[sortedPartionIDs[j]]) {
681 ret := strings.Compare(sortedPartionIDs[i].Topic, sortedPartionIDs[j].Topic)
682 if ret == 0 {
683 return sortedPartionIDs[i].Partition < sortedPartionIDs[j].Partition
684 }
685 return ret < 0
686 }
687 return len(partition2AllPotentialConsumers[sortedPartionIDs[i]]) < len(partition2AllPotentialConsumers[sortedPartionIDs[j]])
688 })
689 return sortedPartionIDs
690}
691
692func deepCopyPartitions(src []topicPartitionAssignment) []topicPartitionAssignment {
693 dst := make([]topicPartitionAssignment, len(src))
694 for i, partition := range src {
695 dst[i] = partition
696 }
697 return dst
698}
699
700func deepCopyAssignment(assignment map[string][]topicPartitionAssignment) map[string][]topicPartitionAssignment {
701 copy := make(map[string][]topicPartitionAssignment, len(assignment))
702 for memberID, subscriptions := range assignment {
703 copy[memberID] = append(subscriptions[:0:0], subscriptions...)
704 }
705 return copy
706}
707
708func areSubscriptionsIdentical(partition2AllPotentialConsumers map[topicPartitionAssignment][]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) bool {
709 curMembers := make(map[string]int)
710 for _, cur := range partition2AllPotentialConsumers {
711 if len(curMembers) == 0 {
712 for _, curMembersElem := range cur {
713 curMembers[curMembersElem]++
714 }
715 continue
716 }
717
718 if len(curMembers) != len(cur) {
719 return false
720 }
721
722 yMap := make(map[string]int)
723 for _, yElem := range cur {
724 yMap[yElem]++
725 }
726
727 for curMembersMapKey, curMembersMapVal := range curMembers {
728 if yMap[curMembersMapKey] != curMembersMapVal {
729 return false
730 }
731 }
732 }
733
734 curPartitions := make(map[topicPartitionAssignment]int)
735 for _, cur := range consumer2AllPotentialPartitions {
736 if len(curPartitions) == 0 {
737 for _, curPartitionElem := range cur {
738 curPartitions[curPartitionElem]++
739 }
740 continue
741 }
742
743 if len(curPartitions) != len(cur) {
744 return false
745 }
746
747 yMap := make(map[topicPartitionAssignment]int)
748 for _, yElem := range cur {
749 yMap[yElem]++
750 }
751
752 for curMembersMapKey, curMembersMapVal := range curPartitions {
753 if yMap[curMembersMapKey] != curMembersMapVal {
754 return false
755 }
756 }
757 }
758 return true
759}
760
761// We need to process subscriptions' user data with each consumer's reported generation in mind
762// higher generations overwrite lower generations in case of a conflict
763// note that a conflict could exist only if user data is for different generations
764func prepopulateCurrentAssignments(members map[string]ConsumerGroupMemberMetadata) (map[string][]topicPartitionAssignment, map[topicPartitionAssignment]consumerGenerationPair, error) {
765 currentAssignment := make(map[string][]topicPartitionAssignment)
766 prevAssignment := make(map[topicPartitionAssignment]consumerGenerationPair)
767
768 // for each partition we create a sorted map of its consumers by generation
769 sortedPartitionConsumersByGeneration := make(map[topicPartitionAssignment]map[int]string)
770 for memberID, meta := range members {
771 consumerUserData, err := deserializeTopicPartitionAssignment(meta.UserData)
772 if err != nil {
773 return nil, nil, err
774 }
775 for _, partition := range consumerUserData.partitions() {
776 if consumers, exists := sortedPartitionConsumersByGeneration[partition]; exists {
777 if consumerUserData.hasGeneration() {
778 if _, generationExists := consumers[consumerUserData.generation()]; generationExists {
779 // same partition is assigned to two consumers during the same rebalance.
780 // log a warning and skip this record
781 Logger.Printf("Topic %s Partition %d is assigned to multiple consumers following sticky assignment generation %d", partition.Topic, partition.Partition, consumerUserData.generation())
782 continue
783 } else {
784 consumers[consumerUserData.generation()] = memberID
785 }
786 } else {
787 consumers[defaultGeneration] = memberID
788 }
789 } else {
790 generation := defaultGeneration
791 if consumerUserData.hasGeneration() {
792 generation = consumerUserData.generation()
793 }
794 sortedPartitionConsumersByGeneration[partition] = map[int]string{generation: memberID}
795 }
796 }
797 }
798
799 // prevAssignment holds the prior ConsumerGenerationPair (before current) of each partition
800 // current and previous consumers are the last two consumers of each partition in the above sorted map
801 for partition, consumers := range sortedPartitionConsumersByGeneration {
802 // sort consumers by generation in decreasing order
803 var generations []int
804 for generation := range consumers {
805 generations = append(generations, generation)
806 }
807 sort.Sort(sort.Reverse(sort.IntSlice(generations)))
808
809 consumer := consumers[generations[0]]
810 if _, exists := currentAssignment[consumer]; !exists {
811 currentAssignment[consumer] = []topicPartitionAssignment{partition}
812 } else {
813 currentAssignment[consumer] = append(currentAssignment[consumer], partition)
814 }
815
816 // check for previous assignment, if any
817 if len(generations) > 1 {
818 prevAssignment[partition] = consumerGenerationPair{
819 MemberID: consumers[generations[1]],
820 Generation: generations[1],
821 }
822 }
823 }
824 return currentAssignment, prevAssignment, nil
825}
826
827type consumerGenerationPair struct {
828 MemberID string
829 Generation int
830}
831
832// consumerPair represents a pair of Kafka consumer ids involved in a partition reassignment.
833type consumerPair struct {
834 SrcMemberID string
835 DstMemberID string
836}
837
838// partitionMovements maintains some data structures to simplify lookup of partition movements among consumers.
839type partitionMovements struct {
840 PartitionMovementsByTopic map[string]map[consumerPair]map[topicPartitionAssignment]bool
841 Movements map[topicPartitionAssignment]consumerPair
842}
843
844func (p *partitionMovements) removeMovementRecordOfPartition(partition topicPartitionAssignment) consumerPair {
845 pair := p.Movements[partition]
846 delete(p.Movements, partition)
847
848 partitionMovementsForThisTopic := p.PartitionMovementsByTopic[partition.Topic]
849 delete(partitionMovementsForThisTopic[pair], partition)
850 if len(partitionMovementsForThisTopic[pair]) == 0 {
851 delete(partitionMovementsForThisTopic, pair)
852 }
853 if len(p.PartitionMovementsByTopic[partition.Topic]) == 0 {
854 delete(p.PartitionMovementsByTopic, partition.Topic)
855 }
856 return pair
857}
858
859func (p *partitionMovements) addPartitionMovementRecord(partition topicPartitionAssignment, pair consumerPair) {
860 p.Movements[partition] = pair
861 if _, exists := p.PartitionMovementsByTopic[partition.Topic]; !exists {
862 p.PartitionMovementsByTopic[partition.Topic] = make(map[consumerPair]map[topicPartitionAssignment]bool)
863 }
864 partitionMovementsForThisTopic := p.PartitionMovementsByTopic[partition.Topic]
865 if _, exists := partitionMovementsForThisTopic[pair]; !exists {
866 partitionMovementsForThisTopic[pair] = make(map[topicPartitionAssignment]bool)
867 }
868 partitionMovementsForThisTopic[pair][partition] = true
869}
870
871func (p *partitionMovements) movePartition(partition topicPartitionAssignment, oldConsumer, newConsumer string) {
872 pair := consumerPair{
873 SrcMemberID: oldConsumer,
874 DstMemberID: newConsumer,
875 }
876 if _, exists := p.Movements[partition]; exists {
877 // this partition has previously moved
878 existingPair := p.removeMovementRecordOfPartition(partition)
879 if existingPair.DstMemberID != oldConsumer {
880 Logger.Printf("Existing pair DstMemberID %s was not equal to the oldConsumer ID %s", existingPair.DstMemberID, oldConsumer)
881 }
882 if existingPair.SrcMemberID != newConsumer {
883 // the partition is not moving back to its previous consumer
884 p.addPartitionMovementRecord(partition, consumerPair{
885 SrcMemberID: existingPair.SrcMemberID,
886 DstMemberID: newConsumer,
887 })
888 }
889 } else {
890 p.addPartitionMovementRecord(partition, pair)
891 }
892}
893
894func (p *partitionMovements) getTheActualPartitionToBeMoved(partition topicPartitionAssignment, oldConsumer, newConsumer string) topicPartitionAssignment {
895 if _, exists := p.PartitionMovementsByTopic[partition.Topic]; !exists {
896 return partition
897 }
898 if _, exists := p.Movements[partition]; exists {
899 // this partition has previously moved
900 if oldConsumer != p.Movements[partition].DstMemberID {
901 Logger.Printf("Partition movement DstMemberID %s was not equal to the oldConsumer ID %s", p.Movements[partition].DstMemberID, oldConsumer)
902 }
903 oldConsumer = p.Movements[partition].SrcMemberID
904 }
905
906 partitionMovementsForThisTopic := p.PartitionMovementsByTopic[partition.Topic]
907 reversePair := consumerPair{
908 SrcMemberID: newConsumer,
909 DstMemberID: oldConsumer,
910 }
911 if _, exists := partitionMovementsForThisTopic[reversePair]; !exists {
912 return partition
913 }
914 var reversePairPartition topicPartitionAssignment
915 for otherPartition := range partitionMovementsForThisTopic[reversePair] {
916 reversePairPartition = otherPartition
917 }
918 return reversePairPartition
919}
920
921func (p *partitionMovements) isLinked(src, dst string, pairs []consumerPair, currentPath []string) ([]string, bool) {
922 if src == dst {
923 return currentPath, false
924 }
925 if len(pairs) == 0 {
926 return currentPath, false
927 }
928 for _, pair := range pairs {
929 if src == pair.SrcMemberID && dst == pair.DstMemberID {
930 currentPath = append(currentPath, src, dst)
931 return currentPath, true
932 }
933 }
934
935 for _, pair := range pairs {
936 if pair.SrcMemberID == src {
937 // create a deep copy of the pairs, excluding the current pair
938 reducedSet := make([]consumerPair, len(pairs)-1)
939 i := 0
940 for _, p := range pairs {
941 if p != pair {
942 reducedSet[i] = pair
943 i++
944 }
945 }
946
947 currentPath = append(currentPath, pair.SrcMemberID)
948 return p.isLinked(pair.DstMemberID, dst, reducedSet, currentPath)
949 }
950 }
951 return currentPath, false
952}
953
954func (p *partitionMovements) in(cycle []string, cycles [][]string) bool {
955 superCycle := make([]string, len(cycle)-1)
956 for i := 0; i < len(cycle)-1; i++ {
957 superCycle[i] = cycle[i]
958 }
959 for _, c := range cycle {
960 superCycle = append(superCycle, c)
961 }
962 for _, foundCycle := range cycles {
963 if len(foundCycle) == len(cycle) && indexOfSubList(superCycle, foundCycle) != -1 {
964 return true
965 }
966 }
967 return false
968}
969
970func (p *partitionMovements) hasCycles(pairs []consumerPair) bool {
971 cycles := make([][]string, 0)
972 for _, pair := range pairs {
973 // create a deep copy of the pairs, excluding the current pair
974 reducedPairs := make([]consumerPair, len(pairs)-1)
975 i := 0
976 for _, p := range pairs {
977 if p != pair {
978 reducedPairs[i] = pair
979 i++
980 }
981 }
982 if path, linked := p.isLinked(pair.DstMemberID, pair.SrcMemberID, reducedPairs, []string{pair.SrcMemberID}); linked {
983 if !p.in(path, cycles) {
984 cycles = append(cycles, path)
985 Logger.Printf("A cycle of length %d was found: %v", len(path)-1, path)
986 }
987 }
988 }
989
990 // for now we want to make sure there is no partition movements of the same topic between a pair of consumers.
991 // the odds of finding a cycle among more than two consumers seem to be very low (according to various randomized
992 // tests with the given sticky algorithm) that it should not worth the added complexity of handling those cases.
993 for _, cycle := range cycles {
994 if len(cycle) == 3 {
995 return true
996 }
997 }
998 return false
999}
1000
1001func (p *partitionMovements) isSticky() bool {
1002 for topic, movements := range p.PartitionMovementsByTopic {
1003 movementPairs := make([]consumerPair, len(movements))
1004 i := 0
1005 for pair := range movements {
1006 movementPairs[i] = pair
1007 i++
1008 }
1009 if p.hasCycles(movementPairs) {
1010 Logger.Printf("Stickiness is violated for topic %s", topic)
1011 Logger.Printf("Partition movements for this topic occurred among the following consumer pairs: %v", movements)
1012 return false
1013 }
1014 }
1015 return true
1016}
1017
1018func indexOfSubList(source []string, target []string) int {
1019 targetSize := len(target)
1020 maxCandidate := len(source) - targetSize
1021nextCand:
1022 for candidate := 0; candidate <= maxCandidate; candidate++ {
1023 j := candidate
1024 for i := 0; i < targetSize; i++ {
1025 if target[i] != source[j] {
1026 // Element mismatch, try next cand
1027 continue nextCand
1028 }
1029 j++
1030 }
1031 // All elements of candidate matched target
1032 return candidate
1033 }
1034 return -1
1035}
1036
1037type consumerGroupMember struct {
1038 id string
1039 assignments []topicPartitionAssignment
1040}
1041
1042// assignmentPriorityQueue is a priority-queue of consumer group members that is sorted
1043// in descending order (most assignments to least assignments).
1044type assignmentPriorityQueue []*consumerGroupMember
1045
1046func (pq assignmentPriorityQueue) Len() int { return len(pq) }
1047
1048func (pq assignmentPriorityQueue) Less(i, j int) bool {
1049 // order asssignment priority queue in descending order using assignment-count/member-id
1050 if len(pq[i].assignments) == len(pq[j].assignments) {
1051 return strings.Compare(pq[i].id, pq[j].id) > 0
1052 }
1053 return len(pq[i].assignments) > len(pq[j].assignments)
1054}
1055
1056func (pq assignmentPriorityQueue) Swap(i, j int) {
1057 pq[i], pq[j] = pq[j], pq[i]
1058}
1059
1060func (pq *assignmentPriorityQueue) Push(x interface{}) {
1061 member := x.(*consumerGroupMember)
1062 *pq = append(*pq, member)
1063}
1064
1065func (pq *assignmentPriorityQueue) Pop() interface{} {
1066 old := *pq
1067 n := len(old)
1068 member := old[n-1]
1069 *pq = old[0 : n-1]
1070 return member
1071}