blob: 6a708e729ee5853cad603c0af5728d31295b677e [file] [log] [blame]
onkarkundargi72cfd362020-02-27 12:34:37 +05301package sarama
2
3import (
4 "hash"
5 "hash/fnv"
6 "math/rand"
7 "time"
8)
9
10// Partitioner is anything that, given a Kafka message and a number of partitions indexed [0...numPartitions-1],
11// decides to which partition to send the message. RandomPartitioner, RoundRobinPartitioner and HashPartitioner are provided
12// as simple default implementations.
13type Partitioner interface {
14 // Partition takes a message and partition count and chooses a partition
15 Partition(message *ProducerMessage, numPartitions int32) (int32, error)
16
17 // RequiresConsistency indicates to the user of the partitioner whether the
18 // mapping of key->partition is consistent or not. Specifically, if a
19 // partitioner requires consistency then it must be allowed to choose from all
20 // partitions (even ones known to be unavailable), and its choice must be
21 // respected by the caller. The obvious example is the HashPartitioner.
22 RequiresConsistency() bool
23}
24
25// DynamicConsistencyPartitioner can optionally be implemented by Partitioners
26// in order to allow more flexibility than is originally allowed by the
27// RequiresConsistency method in the Partitioner interface. This allows
28// partitioners to require consistency sometimes, but not all times. It's useful
29// for, e.g., the HashPartitioner, which does not require consistency if the
30// message key is nil.
31type DynamicConsistencyPartitioner interface {
32 Partitioner
33
34 // MessageRequiresConsistency is similar to Partitioner.RequiresConsistency,
35 // but takes in the message being partitioned so that the partitioner can
36 // make a per-message determination.
37 MessageRequiresConsistency(message *ProducerMessage) bool
38}
39
40// PartitionerConstructor is the type for a function capable of constructing new Partitioners.
41type PartitionerConstructor func(topic string) Partitioner
42
43type manualPartitioner struct{}
44
45// HashPartitionOption lets you modify default values of the partitioner
46type HashPartitionerOption func(*hashPartitioner)
47
48// WithAbsFirst means that the partitioner handles absolute values
49// in the same way as the reference Java implementation
50func WithAbsFirst() HashPartitionerOption {
51 return func(hp *hashPartitioner) {
52 hp.referenceAbs = true
53 }
54}
55
56// WithCustomHashFunction lets you specify what hash function to use for the partitioning
57func WithCustomHashFunction(hasher func() hash.Hash32) HashPartitionerOption {
58 return func(hp *hashPartitioner) {
59 hp.hasher = hasher()
60 }
61}
62
63// WithCustomFallbackPartitioner lets you specify what HashPartitioner should be used in case a Distribution Key is empty
64func WithCustomFallbackPartitioner(randomHP *hashPartitioner) HashPartitionerOption {
65 return func(hp *hashPartitioner) {
66 hp.random = hp
67 }
68}
69
70// NewManualPartitioner returns a Partitioner which uses the partition manually set in the provided
71// ProducerMessage's Partition field as the partition to produce to.
72func NewManualPartitioner(topic string) Partitioner {
73 return new(manualPartitioner)
74}
75
76func (p *manualPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) {
77 return message.Partition, nil
78}
79
80func (p *manualPartitioner) RequiresConsistency() bool {
81 return true
82}
83
84type randomPartitioner struct {
85 generator *rand.Rand
86}
87
88// NewRandomPartitioner returns a Partitioner which chooses a random partition each time.
89func NewRandomPartitioner(topic string) Partitioner {
90 p := new(randomPartitioner)
91 p.generator = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
92 return p
93}
94
95func (p *randomPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) {
96 return int32(p.generator.Intn(int(numPartitions))), nil
97}
98
99func (p *randomPartitioner) RequiresConsistency() bool {
100 return false
101}
102
103type roundRobinPartitioner struct {
104 partition int32
105}
106
107// NewRoundRobinPartitioner returns a Partitioner which walks through the available partitions one at a time.
108func NewRoundRobinPartitioner(topic string) Partitioner {
109 return &roundRobinPartitioner{}
110}
111
112func (p *roundRobinPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) {
113 if p.partition >= numPartitions {
114 p.partition = 0
115 }
116 ret := p.partition
117 p.partition++
118 return ret, nil
119}
120
121func (p *roundRobinPartitioner) RequiresConsistency() bool {
122 return false
123}
124
125type hashPartitioner struct {
126 random Partitioner
127 hasher hash.Hash32
128 referenceAbs bool
129}
130
131// NewCustomHashPartitioner is a wrapper around NewHashPartitioner, allowing the use of custom hasher.
132// The argument is a function providing the instance, implementing the hash.Hash32 interface. This is to ensure that
133// each partition dispatcher gets its own hasher, to avoid concurrency issues by sharing an instance.
134func NewCustomHashPartitioner(hasher func() hash.Hash32) PartitionerConstructor {
135 return func(topic string) Partitioner {
136 p := new(hashPartitioner)
137 p.random = NewRandomPartitioner(topic)
138 p.hasher = hasher()
139 p.referenceAbs = false
140 return p
141 }
142}
143
144// NewCustomPartitioner creates a default Partitioner but lets you specify the behavior of each component via options
145func NewCustomPartitioner(options ...HashPartitionerOption) PartitionerConstructor {
146 return func(topic string) Partitioner {
147 p := new(hashPartitioner)
148 p.random = NewRandomPartitioner(topic)
149 p.hasher = fnv.New32a()
150 p.referenceAbs = false
151 for _, option := range options {
152 option(p)
153 }
154 return p
155 }
156}
157
158// NewHashPartitioner returns a Partitioner which behaves as follows. If the message's key is nil then a
159// random partition is chosen. Otherwise the FNV-1a hash of the encoded bytes of the message key is used,
160// modulus the number of partitions. This ensures that messages with the same key always end up on the
161// same partition.
162func NewHashPartitioner(topic string) Partitioner {
163 p := new(hashPartitioner)
164 p.random = NewRandomPartitioner(topic)
165 p.hasher = fnv.New32a()
166 p.referenceAbs = false
167 return p
168}
169
170// NewReferenceHashPartitioner is like NewHashPartitioner except that it handles absolute values
171// in the same way as the reference Java implementation. NewHashPartitioner was supposed to do
172// that but it had a mistake and now there are people depending on both behaviours. This will
173// all go away on the next major version bump.
174func NewReferenceHashPartitioner(topic string) Partitioner {
175 p := new(hashPartitioner)
176 p.random = NewRandomPartitioner(topic)
177 p.hasher = fnv.New32a()
178 p.referenceAbs = true
179 return p
180}
181
182func (p *hashPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) {
183 if message.Key == nil {
184 return p.random.Partition(message, numPartitions)
185 }
186 bytes, err := message.Key.Encode()
187 if err != nil {
188 return -1, err
189 }
190 p.hasher.Reset()
191 _, err = p.hasher.Write(bytes)
192 if err != nil {
193 return -1, err
194 }
195 var partition int32
196 // Turns out we were doing our absolute value in a subtly different way from the upstream
197 // implementation, but now we need to maintain backwards compat for people who started using
198 // the old version; if referenceAbs is set we are compatible with the reference java client
199 // but not past Sarama versions
200 if p.referenceAbs {
201 partition = (int32(p.hasher.Sum32()) & 0x7fffffff) % numPartitions
202 } else {
203 partition = int32(p.hasher.Sum32()) % numPartitions
204 if partition < 0 {
205 partition = -partition
206 }
207 }
208 return partition, nil
209}
210
211func (p *hashPartitioner) RequiresConsistency() bool {
212 return true
213}
214
215func (p *hashPartitioner) MessageRequiresConsistency(message *ProducerMessage) bool {
216 return message.Key != nil
217}