blob: a1446d606a483a9c1dfa5a096517f024afba0328 [file] [log] [blame]
Matteo Scandolo18f5eb12020-04-17 10:34:25 -07001// Copyright (c) 2018 Burak Sezer
2// All rights reserved.
3//
4// This code is licensed under the MIT License.
5//
6// Permission is hereby granted, free of charge, to any person obtaining a copy
7// of this software and associated documentation files(the "Software"), to deal
8// in the Software without restriction, including without limitation the rights
9// to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
10// copies of the Software, and to permit persons to whom the Software is
11// furnished to do so, subject to the following conditions :
12//
13// The above copyright notice and this permission notice shall be included in
14// all copies or substantial portions of the Software.
15//
16// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
19// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22// THE SOFTWARE.
23
24// Package consistent provides a consistent hashing function with bounded loads.
25// For more information about the underlying algorithm, please take a look at
26// https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html
27//
28// Example Use:
29// cfg := consistent.Config{
30// PartitionCount: 71,
31// ReplicationFactor: 20,
32// Load: 1.25,
33// Hasher: hasher{},
34// }
35//
36// // Create a new consistent object
37// // You may call this with a list of members
38// // instead of adding them one by one.
39// c := consistent.New(members, cfg)
40//
41// // myMember struct just needs to implement a String method.
42// // New/Add/Remove distributes partitions among members using the algorithm
43// // defined on Google Research Blog.
44// c.Add(myMember)
45//
46// key := []byte("my-key")
47// // LocateKey hashes the key and calculates partition ID with
48// // this modulo operation: MOD(hash result, partition count)
49// // The owner of the partition is already calculated by New/Add/Remove.
50// // LocateKey just returns the member which's responsible for the key.
51// member := c.LocateKey(key)
52//
53package consistent
54
55import (
56 "encoding/binary"
57 "errors"
58 "fmt"
59 "math"
60 "sort"
61 "sync"
62)
63
64var (
65 //ErrInsufficientMemberCount represents an error which means there are not enough members to complete the task.
66 ErrInsufficientMemberCount = errors.New("insufficient member count")
67
68 // ErrMemberNotFound represents an error which means requested member could not be found in consistent hash ring.
69 ErrMemberNotFound = errors.New("member could not be found in ring")
70)
71
72// Hasher is responsible for generating unsigned, 64 bit hash of provided byte slice.
73// Hasher should minimize collisions (generating same hash for different byte slice)
74// and while performance is also important fast functions are preferable (i.e.
75// you can use FarmHash family).
76type Hasher interface {
77 Sum64([]byte) uint64
78}
79
80// Member interface represents a member in consistent hash ring.
81type Member interface {
82 String() string
83}
84
85// Config represents a structure to control consistent package.
86type Config struct {
87 // Hasher is responsible for generating unsigned, 64 bit hash of provided byte slice.
88 Hasher Hasher
89
90 // Keys are distributed among partitions. Prime numbers are good to
91 // distribute keys uniformly. Select a big PartitionCount if you have
92 // too many keys.
93 PartitionCount int
94
95 // Members are replicated on consistent hash ring. This number means that a member
96 // how many times replicated on the ring.
97 ReplicationFactor int
98
99 // Load is used to calculate average load. See the code, the paper and Google's blog post to learn about it.
100 Load float64
101}
102
103// Consistent holds the information about the members of the consistent hash circle.
104type Consistent struct {
105 mu sync.RWMutex
106
107 config Config
108 hasher Hasher
109 sortedSet []uint64
110 partitionCount uint64
111 loads map[string]float64
112 members map[string]*Member
113 partitions map[int]*Member
114 ring map[uint64]*Member
115}
116
117// New creates and returns a new Consistent object.
118func New(members []Member, config Config) *Consistent {
119 c := &Consistent{
120 config: config,
121 members: make(map[string]*Member),
122 partitionCount: uint64(config.PartitionCount),
123 ring: make(map[uint64]*Member),
124 }
125 if config.Hasher == nil {
126 panic("Hasher cannot be nil")
127 }
128 // TODO: Check configuration here
129 c.hasher = config.Hasher
130 for _, member := range members {
131 c.add(member)
132 }
133 if members != nil {
134 c.distributePartitions()
135 }
136 return c
137}
138
139// GetMembers returns a thread-safe copy of members.
140func (c *Consistent) GetMembers() []Member {
141 c.mu.RLock()
142 defer c.mu.RUnlock()
143
144 // Create a thread-safe copy of member list.
145 members := make([]Member, 0, len(c.members))
146 for _, member := range c.members {
147 members = append(members, *member)
148 }
149 return members
150}
151
152// AverageLoad exposes the current average load.
153func (c *Consistent) AverageLoad() float64 {
154 avgLoad := float64(c.partitionCount/uint64(len(c.members))) * c.config.Load
155 return math.Ceil(avgLoad)
156}
157
158func (c *Consistent) distributeWithLoad(partID, idx int, partitions map[int]*Member, loads map[string]float64) {
159 avgLoad := c.AverageLoad()
160 var count int
161 for {
162 count++
163 if count >= len(c.sortedSet) {
164 // User needs to decrease partition count, increase member count or increase load factor.
165 panic("not enough room to distribute partitions")
166 }
167 i := c.sortedSet[idx]
168 member := *c.ring[i]
169 load := loads[member.String()]
170 if load+1 <= avgLoad {
171 partitions[partID] = &member
172 loads[member.String()]++
173 return
174 }
175 idx++
176 if idx >= len(c.sortedSet) {
177 idx = 0
178 }
179 }
180}
181
182func (c *Consistent) distributePartitions() {
183 loads := make(map[string]float64)
184 partitions := make(map[int]*Member)
185
186 bs := make([]byte, 8)
187 for partID := uint64(0); partID < c.partitionCount; partID++ {
188 binary.LittleEndian.PutUint64(bs, partID)
189 key := c.hasher.Sum64(bs)
190 idx := sort.Search(len(c.sortedSet), func(i int) bool {
191 return c.sortedSet[i] >= key
192 })
193 if idx >= len(c.sortedSet) {
194 idx = 0
195 }
196 c.distributeWithLoad(int(partID), idx, partitions, loads)
197 }
198 c.partitions = partitions
199 c.loads = loads
200}
201
202func (c *Consistent) add(member Member) {
203 for i := 0; i < c.config.ReplicationFactor; i++ {
204 key := []byte(fmt.Sprintf("%s%d", member.String(), i))
205 h := c.hasher.Sum64(key)
206 c.ring[h] = &member
207 c.sortedSet = append(c.sortedSet, h)
208 }
209 // sort hashes ascendingly
210 sort.Slice(c.sortedSet, func(i int, j int) bool {
211 return c.sortedSet[i] < c.sortedSet[j]
212 })
213 // Storing member at this map is useful to find backup members of a partition.
214 c.members[member.String()] = &member
215}
216
217// Add adds a new member to the consistent hash circle.
218func (c *Consistent) Add(member Member) {
219 c.mu.Lock()
220 defer c.mu.Unlock()
221
222 if _, ok := c.members[member.String()]; ok {
223 // We already have this member. Quit immediately.
224 return
225 }
226 c.add(member)
227 c.distributePartitions()
228}
229
230func (c *Consistent) delSlice(val uint64) {
231 for i := 0; i < len(c.sortedSet); i++ {
232 if c.sortedSet[i] == val {
233 c.sortedSet = append(c.sortedSet[:i], c.sortedSet[i+1:]...)
234 break
235 }
236 }
237}
238
239// Remove removes a member from the consistent hash circle.
240func (c *Consistent) Remove(name string) {
241 c.mu.Lock()
242 defer c.mu.Unlock()
243
244 if _, ok := c.members[name]; !ok {
245 // There is no member with that name. Quit immediately.
246 return
247 }
248
249 for i := 0; i < c.config.ReplicationFactor; i++ {
250 key := []byte(fmt.Sprintf("%s%d", name, i))
251 h := c.hasher.Sum64(key)
252 delete(c.ring, h)
253 c.delSlice(h)
254 }
255 delete(c.members, name)
256 if len(c.members) == 0 {
257 // consistent hash ring is empty now. Reset the partition table.
258 c.partitions = make(map[int]*Member)
259 return
260 }
261 c.distributePartitions()
262}
263
264// LoadDistribution exposes load distribution of members.
265func (c *Consistent) LoadDistribution() map[string]float64 {
266 c.mu.RLock()
267 defer c.mu.RUnlock()
268
269 // Create a thread-safe copy
270 res := make(map[string]float64)
271 for member, load := range c.loads {
272 res[member] = load
273 }
274 return res
275}
276
277// FindPartitionID returns partition id for given key.
278func (c *Consistent) FindPartitionID(key []byte) int {
279 hkey := c.hasher.Sum64(key)
280 return int(hkey % c.partitionCount)
281}
282
283// GetPartitionOwner returns the owner of the given partition.
284func (c *Consistent) GetPartitionOwner(partID int) Member {
285 c.mu.RLock()
286 defer c.mu.RUnlock()
287
288 member, ok := c.partitions[partID]
289 if !ok {
290 return nil
291 }
292 // Create a thread-safe copy of member and return it.
293 return *member
294}
295
296// LocateKey finds a home for given key
297func (c *Consistent) LocateKey(key []byte) Member {
298 partID := c.FindPartitionID(key)
299 return c.GetPartitionOwner(partID)
300}
301
302func (c *Consistent) getClosestN(partID, count int) ([]Member, error) {
303 c.mu.RLock()
304 defer c.mu.RUnlock()
305
306 res := []Member{}
307 if count > len(c.members) {
308 return res, ErrInsufficientMemberCount
309 }
310
311 var ownerKey uint64
312 owner := c.GetPartitionOwner(partID)
313 // Hash and sort all the names.
314 keys := []uint64{}
315 kmems := make(map[uint64]*Member)
316 for name, member := range c.members {
317 key := c.hasher.Sum64([]byte(name))
318 if name == owner.String() {
319 ownerKey = key
320 }
321 keys = append(keys, key)
322 kmems[key] = member
323 }
324 sort.Slice(keys, func(i, j int) bool {
325 return keys[i] < keys[j]
326 })
327
328 // Find the key owner
329 idx := 0
330 for idx < len(keys) {
331 if keys[idx] == ownerKey {
332 key := keys[idx]
333 res = append(res, *kmems[key])
334 break
335 }
336 idx++
337 }
338
339 // Find the closest(replica owners) members.
340 for len(res) < count {
341 idx++
342 if idx >= len(keys) {
343 idx = 0
344 }
345 key := keys[idx]
346 res = append(res, *kmems[key])
347 }
348 return res, nil
349}
350
351// GetClosestN returns the closest N member to a key in the hash ring.
352// This may be useful to find members for replication.
353func (c *Consistent) GetClosestN(key []byte, count int) ([]Member, error) {
354 partID := c.FindPartitionID(key)
355 return c.getClosestN(partID, count)
356}
357
358// GetClosestNForPartition returns the closest N member for given partition.
359// This may be useful to find members for replication.
360func (c *Consistent) GetClosestNForPartition(partID, count int) ([]Member, error) {
361 return c.getClosestN(partID, count)
362}