Matteo Scandolo | d525ae3 | 2020-04-02 17:27:29 -0700 | [diff] [blame] | 1 | // Copyright (c) 2018 Burak Sezer |
| 2 | // All rights reserved. |
| 3 | // |
| 4 | // This code is licensed under the MIT License. |
| 5 | // |
| 6 | // Permission is hereby granted, free of charge, to any person obtaining a copy |
| 7 | // of this software and associated documentation files(the "Software"), to deal |
| 8 | // in the Software without restriction, including without limitation the rights |
| 9 | // to use, copy, modify, merge, publish, distribute, sublicense, and / or sell |
| 10 | // copies of the Software, and to permit persons to whom the Software is |
| 11 | // furnished to do so, subject to the following conditions : |
| 12 | // |
| 13 | // The above copyright notice and this permission notice shall be included in |
| 14 | // all copies or substantial portions of the Software. |
| 15 | // |
| 16 | // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
| 17 | // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
| 18 | // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE |
| 19 | // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
| 20 | // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| 21 | // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
| 22 | // THE SOFTWARE. |
| 23 | |
| 24 | // Package consistent provides a consistent hashing function with bounded loads. |
| 25 | // For more information about the underlying algorithm, please take a look at |
| 26 | // https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html |
| 27 | // |
| 28 | // Example Use: |
| 29 | // cfg := consistent.Config{ |
| 30 | // PartitionCount: 71, |
| 31 | // ReplicationFactor: 20, |
| 32 | // Load: 1.25, |
| 33 | // Hasher: hasher{}, |
| 34 | // } |
| 35 | // |
| 36 | // // Create a new consistent object |
| 37 | // // You may call this with a list of members |
| 38 | // // instead of adding them one by one. |
| 39 | // c := consistent.New(members, cfg) |
| 40 | // |
| 41 | // // myMember struct just needs to implement a String method. |
| 42 | // // New/Add/Remove distributes partitions among members using the algorithm |
| 43 | // // defined on Google Research Blog. |
| 44 | // c.Add(myMember) |
| 45 | // |
| 46 | // key := []byte("my-key") |
| 47 | // // LocateKey hashes the key and calculates partition ID with |
| 48 | // // this modulo operation: MOD(hash result, partition count) |
| 49 | // // The owner of the partition is already calculated by New/Add/Remove. |
| 50 | // // LocateKey just returns the member which's responsible for the key. |
| 51 | // member := c.LocateKey(key) |
| 52 | // |
| 53 | package consistent |
| 54 | |
| 55 | import ( |
| 56 | "encoding/binary" |
| 57 | "errors" |
| 58 | "fmt" |
| 59 | "math" |
| 60 | "sort" |
| 61 | "sync" |
| 62 | ) |
| 63 | |
| 64 | var ( |
| 65 | //ErrInsufficientMemberCount represents an error which means there are not enough members to complete the task. |
| 66 | ErrInsufficientMemberCount = errors.New("insufficient member count") |
| 67 | |
| 68 | // ErrMemberNotFound represents an error which means requested member could not be found in consistent hash ring. |
| 69 | ErrMemberNotFound = errors.New("member could not be found in ring") |
| 70 | ) |
| 71 | |
| 72 | // Hasher is responsible for generating unsigned, 64 bit hash of provided byte slice. |
| 73 | // Hasher should minimize collisions (generating same hash for different byte slice) |
| 74 | // and while performance is also important fast functions are preferable (i.e. |
| 75 | // you can use FarmHash family). |
| 76 | type Hasher interface { |
| 77 | Sum64([]byte) uint64 |
| 78 | } |
| 79 | |
| 80 | // Member interface represents a member in consistent hash ring. |
| 81 | type Member interface { |
| 82 | String() string |
| 83 | } |
| 84 | |
| 85 | // Config represents a structure to control consistent package. |
| 86 | type Config struct { |
| 87 | // Hasher is responsible for generating unsigned, 64 bit hash of provided byte slice. |
| 88 | Hasher Hasher |
| 89 | |
| 90 | // Keys are distributed among partitions. Prime numbers are good to |
| 91 | // distribute keys uniformly. Select a big PartitionCount if you have |
| 92 | // too many keys. |
| 93 | PartitionCount int |
| 94 | |
| 95 | // Members are replicated on consistent hash ring. This number means that a member |
| 96 | // how many times replicated on the ring. |
| 97 | ReplicationFactor int |
| 98 | |
| 99 | // Load is used to calculate average load. See the code, the paper and Google's blog post to learn about it. |
| 100 | Load float64 |
| 101 | } |
| 102 | |
| 103 | // Consistent holds the information about the members of the consistent hash circle. |
| 104 | type Consistent struct { |
| 105 | mu sync.RWMutex |
| 106 | |
| 107 | config Config |
| 108 | hasher Hasher |
| 109 | sortedSet []uint64 |
| 110 | partitionCount uint64 |
| 111 | loads map[string]float64 |
| 112 | members map[string]*Member |
| 113 | partitions map[int]*Member |
| 114 | ring map[uint64]*Member |
| 115 | } |
| 116 | |
| 117 | // New creates and returns a new Consistent object. |
| 118 | func New(members []Member, config Config) *Consistent { |
| 119 | c := &Consistent{ |
| 120 | config: config, |
| 121 | members: make(map[string]*Member), |
| 122 | partitionCount: uint64(config.PartitionCount), |
| 123 | ring: make(map[uint64]*Member), |
| 124 | } |
| 125 | if config.Hasher == nil { |
| 126 | panic("Hasher cannot be nil") |
| 127 | } |
| 128 | // TODO: Check configuration here |
| 129 | c.hasher = config.Hasher |
| 130 | for _, member := range members { |
| 131 | c.add(member) |
| 132 | } |
| 133 | if members != nil { |
| 134 | c.distributePartitions() |
| 135 | } |
| 136 | return c |
| 137 | } |
| 138 | |
| 139 | // GetMembers returns a thread-safe copy of members. |
| 140 | func (c *Consistent) GetMembers() []Member { |
| 141 | c.mu.RLock() |
| 142 | defer c.mu.RUnlock() |
| 143 | |
| 144 | // Create a thread-safe copy of member list. |
| 145 | members := make([]Member, 0, len(c.members)) |
| 146 | for _, member := range c.members { |
| 147 | members = append(members, *member) |
| 148 | } |
| 149 | return members |
| 150 | } |
| 151 | |
| 152 | // AverageLoad exposes the current average load. |
| 153 | func (c *Consistent) AverageLoad() float64 { |
| 154 | avgLoad := float64(c.partitionCount/uint64(len(c.members))) * c.config.Load |
| 155 | return math.Ceil(avgLoad) |
| 156 | } |
| 157 | |
| 158 | func (c *Consistent) distributeWithLoad(partID, idx int, partitions map[int]*Member, loads map[string]float64) { |
| 159 | avgLoad := c.AverageLoad() |
| 160 | var count int |
| 161 | for { |
| 162 | count++ |
| 163 | if count >= len(c.sortedSet) { |
| 164 | // User needs to decrease partition count, increase member count or increase load factor. |
| 165 | panic("not enough room to distribute partitions") |
| 166 | } |
| 167 | i := c.sortedSet[idx] |
| 168 | member := *c.ring[i] |
| 169 | load := loads[member.String()] |
| 170 | if load+1 <= avgLoad { |
| 171 | partitions[partID] = &member |
| 172 | loads[member.String()]++ |
| 173 | return |
| 174 | } |
| 175 | idx++ |
| 176 | if idx >= len(c.sortedSet) { |
| 177 | idx = 0 |
| 178 | } |
| 179 | } |
| 180 | } |
| 181 | |
| 182 | func (c *Consistent) distributePartitions() { |
| 183 | loads := make(map[string]float64) |
| 184 | partitions := make(map[int]*Member) |
| 185 | |
| 186 | bs := make([]byte, 8) |
| 187 | for partID := uint64(0); partID < c.partitionCount; partID++ { |
| 188 | binary.LittleEndian.PutUint64(bs, partID) |
| 189 | key := c.hasher.Sum64(bs) |
| 190 | idx := sort.Search(len(c.sortedSet), func(i int) bool { |
| 191 | return c.sortedSet[i] >= key |
| 192 | }) |
| 193 | if idx >= len(c.sortedSet) { |
| 194 | idx = 0 |
| 195 | } |
| 196 | c.distributeWithLoad(int(partID), idx, partitions, loads) |
| 197 | } |
| 198 | c.partitions = partitions |
| 199 | c.loads = loads |
| 200 | } |
| 201 | |
| 202 | func (c *Consistent) add(member Member) { |
| 203 | for i := 0; i < c.config.ReplicationFactor; i++ { |
| 204 | key := []byte(fmt.Sprintf("%s%d", member.String(), i)) |
| 205 | h := c.hasher.Sum64(key) |
| 206 | c.ring[h] = &member |
| 207 | c.sortedSet = append(c.sortedSet, h) |
| 208 | } |
| 209 | // sort hashes ascendingly |
| 210 | sort.Slice(c.sortedSet, func(i int, j int) bool { |
| 211 | return c.sortedSet[i] < c.sortedSet[j] |
| 212 | }) |
| 213 | // Storing member at this map is useful to find backup members of a partition. |
| 214 | c.members[member.String()] = &member |
| 215 | } |
| 216 | |
| 217 | // Add adds a new member to the consistent hash circle. |
| 218 | func (c *Consistent) Add(member Member) { |
| 219 | c.mu.Lock() |
| 220 | defer c.mu.Unlock() |
| 221 | |
| 222 | if _, ok := c.members[member.String()]; ok { |
| 223 | // We already have this member. Quit immediately. |
| 224 | return |
| 225 | } |
| 226 | c.add(member) |
| 227 | c.distributePartitions() |
| 228 | } |
| 229 | |
| 230 | func (c *Consistent) delSlice(val uint64) { |
| 231 | for i := 0; i < len(c.sortedSet); i++ { |
| 232 | if c.sortedSet[i] == val { |
| 233 | c.sortedSet = append(c.sortedSet[:i], c.sortedSet[i+1:]...) |
| 234 | break |
| 235 | } |
| 236 | } |
| 237 | } |
| 238 | |
| 239 | // Remove removes a member from the consistent hash circle. |
| 240 | func (c *Consistent) Remove(name string) { |
| 241 | c.mu.Lock() |
| 242 | defer c.mu.Unlock() |
| 243 | |
| 244 | if _, ok := c.members[name]; !ok { |
| 245 | // There is no member with that name. Quit immediately. |
| 246 | return |
| 247 | } |
| 248 | |
| 249 | for i := 0; i < c.config.ReplicationFactor; i++ { |
| 250 | key := []byte(fmt.Sprintf("%s%d", name, i)) |
| 251 | h := c.hasher.Sum64(key) |
| 252 | delete(c.ring, h) |
| 253 | c.delSlice(h) |
| 254 | } |
| 255 | delete(c.members, name) |
| 256 | if len(c.members) == 0 { |
| 257 | // consistent hash ring is empty now. Reset the partition table. |
| 258 | c.partitions = make(map[int]*Member) |
| 259 | return |
| 260 | } |
| 261 | c.distributePartitions() |
| 262 | } |
| 263 | |
| 264 | // LoadDistribution exposes load distribution of members. |
| 265 | func (c *Consistent) LoadDistribution() map[string]float64 { |
| 266 | c.mu.RLock() |
| 267 | defer c.mu.RUnlock() |
| 268 | |
| 269 | // Create a thread-safe copy |
| 270 | res := make(map[string]float64) |
| 271 | for member, load := range c.loads { |
| 272 | res[member] = load |
| 273 | } |
| 274 | return res |
| 275 | } |
| 276 | |
| 277 | // FindPartitionID returns partition id for given key. |
| 278 | func (c *Consistent) FindPartitionID(key []byte) int { |
| 279 | hkey := c.hasher.Sum64(key) |
| 280 | return int(hkey % c.partitionCount) |
| 281 | } |
| 282 | |
| 283 | // GetPartitionOwner returns the owner of the given partition. |
| 284 | func (c *Consistent) GetPartitionOwner(partID int) Member { |
| 285 | c.mu.RLock() |
| 286 | defer c.mu.RUnlock() |
| 287 | |
| 288 | member, ok := c.partitions[partID] |
| 289 | if !ok { |
| 290 | return nil |
| 291 | } |
| 292 | // Create a thread-safe copy of member and return it. |
| 293 | return *member |
| 294 | } |
| 295 | |
| 296 | // LocateKey finds a home for given key |
| 297 | func (c *Consistent) LocateKey(key []byte) Member { |
| 298 | partID := c.FindPartitionID(key) |
| 299 | return c.GetPartitionOwner(partID) |
| 300 | } |
| 301 | |
| 302 | func (c *Consistent) getClosestN(partID, count int) ([]Member, error) { |
| 303 | c.mu.RLock() |
| 304 | defer c.mu.RUnlock() |
| 305 | |
| 306 | res := []Member{} |
| 307 | if count > len(c.members) { |
| 308 | return res, ErrInsufficientMemberCount |
| 309 | } |
| 310 | |
| 311 | var ownerKey uint64 |
| 312 | owner := c.GetPartitionOwner(partID) |
| 313 | // Hash and sort all the names. |
| 314 | keys := []uint64{} |
| 315 | kmems := make(map[uint64]*Member) |
| 316 | for name, member := range c.members { |
| 317 | key := c.hasher.Sum64([]byte(name)) |
| 318 | if name == owner.String() { |
| 319 | ownerKey = key |
| 320 | } |
| 321 | keys = append(keys, key) |
| 322 | kmems[key] = member |
| 323 | } |
| 324 | sort.Slice(keys, func(i, j int) bool { |
| 325 | return keys[i] < keys[j] |
| 326 | }) |
| 327 | |
| 328 | // Find the key owner |
| 329 | idx := 0 |
| 330 | for idx < len(keys) { |
| 331 | if keys[idx] == ownerKey { |
| 332 | key := keys[idx] |
| 333 | res = append(res, *kmems[key]) |
| 334 | break |
| 335 | } |
| 336 | idx++ |
| 337 | } |
| 338 | |
| 339 | // Find the closest(replica owners) members. |
| 340 | for len(res) < count { |
| 341 | idx++ |
| 342 | if idx >= len(keys) { |
| 343 | idx = 0 |
| 344 | } |
| 345 | key := keys[idx] |
| 346 | res = append(res, *kmems[key]) |
| 347 | } |
| 348 | return res, nil |
| 349 | } |
| 350 | |
| 351 | // GetClosestN returns the closest N member to a key in the hash ring. |
| 352 | // This may be useful to find members for replication. |
| 353 | func (c *Consistent) GetClosestN(key []byte, count int) ([]Member, error) { |
| 354 | partID := c.FindPartitionID(key) |
| 355 | return c.getClosestN(partID, count) |
| 356 | } |
| 357 | |
| 358 | // GetClosestNForPartition returns the closest N member for given partition. |
| 359 | // This may be useful to find members for replication. |
| 360 | func (c *Consistent) GetClosestNForPartition(partID, count int) ([]Member, error) { |
| 361 | return c.getClosestN(partID, count) |
| 362 | } |