| // Copyright (c) 2018 Burak Sezer |
| // All rights reserved. |
| // |
| // This code is licensed under the MIT License. |
| // |
| // Permission is hereby granted, free of charge, to any person obtaining a copy |
| // of this software and associated documentation files(the "Software"), to deal |
| // in the Software without restriction, including without limitation the rights |
| // to use, copy, modify, merge, publish, distribute, sublicense, and / or sell |
| // copies of the Software, and to permit persons to whom the Software is |
| // furnished to do so, subject to the following conditions : |
| // |
| // The above copyright notice and this permission notice shall be included in |
| // all copies or substantial portions of the Software. |
| // |
| // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
| // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
| // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE |
| // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
| // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
| // THE SOFTWARE. |
| |
| // Package consistent provides a consistent hashing function with bounded loads. |
| // For more information about the underlying algorithm, please take a look at |
| // https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html |
| // |
| // Example Use: |
| // cfg := consistent.Config{ |
| // PartitionCount: 71, |
| // ReplicationFactor: 20, |
| // Load: 1.25, |
| // Hasher: hasher{}, |
| // } |
| // |
| // // Create a new consistent object |
| // // You may call this with a list of members |
| // // instead of adding them one by one. |
| // c := consistent.New(members, cfg) |
| // |
| // // myMember struct just needs to implement a String method. |
| // // New/Add/Remove distributes partitions among members using the algorithm |
| // // defined on Google Research Blog. |
| // c.Add(myMember) |
| // |
| // key := []byte("my-key") |
| // // LocateKey hashes the key and calculates partition ID with |
| // // this modulo operation: MOD(hash result, partition count) |
| // // The owner of the partition is already calculated by New/Add/Remove. |
| // // LocateKey just returns the member which's responsible for the key. |
| // member := c.LocateKey(key) |
| // |
| package consistent |
| |
| import ( |
| "encoding/binary" |
| "errors" |
| "fmt" |
| "math" |
| "sort" |
| "sync" |
| ) |
| |
| var ( |
| //ErrInsufficientMemberCount represents an error which means there are not enough members to complete the task. |
| ErrInsufficientMemberCount = errors.New("insufficient member count") |
| |
| // ErrMemberNotFound represents an error which means requested member could not be found in consistent hash ring. |
| ErrMemberNotFound = errors.New("member could not be found in ring") |
| ) |
| |
| // Hasher is responsible for generating unsigned, 64 bit hash of provided byte slice. |
| // Hasher should minimize collisions (generating same hash for different byte slice) |
| // and while performance is also important fast functions are preferable (i.e. |
| // you can use FarmHash family). |
| type Hasher interface { |
| Sum64([]byte) uint64 |
| } |
| |
| // Member interface represents a member in consistent hash ring. |
| type Member interface { |
| String() string |
| } |
| |
| // Config represents a structure to control consistent package. |
| type Config struct { |
| // Hasher is responsible for generating unsigned, 64 bit hash of provided byte slice. |
| Hasher Hasher |
| |
| // Keys are distributed among partitions. Prime numbers are good to |
| // distribute keys uniformly. Select a big PartitionCount if you have |
| // too many keys. |
| PartitionCount int |
| |
| // Members are replicated on consistent hash ring. This number means that a member |
| // how many times replicated on the ring. |
| ReplicationFactor int |
| |
| // Load is used to calculate average load. See the code, the paper and Google's blog post to learn about it. |
| Load float64 |
| } |
| |
| // Consistent holds the information about the members of the consistent hash circle. |
| type Consistent struct { |
| mu sync.RWMutex |
| |
| config Config |
| hasher Hasher |
| sortedSet []uint64 |
| partitionCount uint64 |
| loads map[string]float64 |
| members map[string]*Member |
| partitions map[int]*Member |
| ring map[uint64]*Member |
| } |
| |
| // New creates and returns a new Consistent object. |
| func New(members []Member, config Config) *Consistent { |
| c := &Consistent{ |
| config: config, |
| members: make(map[string]*Member), |
| partitionCount: uint64(config.PartitionCount), |
| ring: make(map[uint64]*Member), |
| } |
| if config.Hasher == nil { |
| panic("Hasher cannot be nil") |
| } |
| // TODO: Check configuration here |
| c.hasher = config.Hasher |
| for _, member := range members { |
| c.add(member) |
| } |
| if members != nil { |
| c.distributePartitions() |
| } |
| return c |
| } |
| |
| // GetMembers returns a thread-safe copy of members. |
| func (c *Consistent) GetMembers() []Member { |
| c.mu.RLock() |
| defer c.mu.RUnlock() |
| |
| // Create a thread-safe copy of member list. |
| members := make([]Member, 0, len(c.members)) |
| for _, member := range c.members { |
| members = append(members, *member) |
| } |
| return members |
| } |
| |
| // AverageLoad exposes the current average load. |
| func (c *Consistent) AverageLoad() float64 { |
| avgLoad := float64(c.partitionCount/uint64(len(c.members))) * c.config.Load |
| return math.Ceil(avgLoad) |
| } |
| |
| func (c *Consistent) distributeWithLoad(partID, idx int, partitions map[int]*Member, loads map[string]float64) { |
| avgLoad := c.AverageLoad() |
| var count int |
| for { |
| count++ |
| if count >= len(c.sortedSet) { |
| // User needs to decrease partition count, increase member count or increase load factor. |
| panic("not enough room to distribute partitions") |
| } |
| i := c.sortedSet[idx] |
| member := *c.ring[i] |
| load := loads[member.String()] |
| if load+1 <= avgLoad { |
| partitions[partID] = &member |
| loads[member.String()]++ |
| return |
| } |
| idx++ |
| if idx >= len(c.sortedSet) { |
| idx = 0 |
| } |
| } |
| } |
| |
| func (c *Consistent) distributePartitions() { |
| loads := make(map[string]float64) |
| partitions := make(map[int]*Member) |
| |
| bs := make([]byte, 8) |
| for partID := uint64(0); partID < c.partitionCount; partID++ { |
| binary.LittleEndian.PutUint64(bs, partID) |
| key := c.hasher.Sum64(bs) |
| idx := sort.Search(len(c.sortedSet), func(i int) bool { |
| return c.sortedSet[i] >= key |
| }) |
| if idx >= len(c.sortedSet) { |
| idx = 0 |
| } |
| c.distributeWithLoad(int(partID), idx, partitions, loads) |
| } |
| c.partitions = partitions |
| c.loads = loads |
| } |
| |
| func (c *Consistent) add(member Member) { |
| for i := 0; i < c.config.ReplicationFactor; i++ { |
| key := []byte(fmt.Sprintf("%s%d", member.String(), i)) |
| h := c.hasher.Sum64(key) |
| c.ring[h] = &member |
| c.sortedSet = append(c.sortedSet, h) |
| } |
| // sort hashes ascendingly |
| sort.Slice(c.sortedSet, func(i int, j int) bool { |
| return c.sortedSet[i] < c.sortedSet[j] |
| }) |
| // Storing member at this map is useful to find backup members of a partition. |
| c.members[member.String()] = &member |
| } |
| |
| // Add adds a new member to the consistent hash circle. |
| func (c *Consistent) Add(member Member) { |
| c.mu.Lock() |
| defer c.mu.Unlock() |
| |
| if _, ok := c.members[member.String()]; ok { |
| // We already have this member. Quit immediately. |
| return |
| } |
| c.add(member) |
| c.distributePartitions() |
| } |
| |
| func (c *Consistent) delSlice(val uint64) { |
| for i := 0; i < len(c.sortedSet); i++ { |
| if c.sortedSet[i] == val { |
| c.sortedSet = append(c.sortedSet[:i], c.sortedSet[i+1:]...) |
| break |
| } |
| } |
| } |
| |
| // Remove removes a member from the consistent hash circle. |
| func (c *Consistent) Remove(name string) { |
| c.mu.Lock() |
| defer c.mu.Unlock() |
| |
| if _, ok := c.members[name]; !ok { |
| // There is no member with that name. Quit immediately. |
| return |
| } |
| |
| for i := 0; i < c.config.ReplicationFactor; i++ { |
| key := []byte(fmt.Sprintf("%s%d", name, i)) |
| h := c.hasher.Sum64(key) |
| delete(c.ring, h) |
| c.delSlice(h) |
| } |
| delete(c.members, name) |
| if len(c.members) == 0 { |
| // consistent hash ring is empty now. Reset the partition table. |
| c.partitions = make(map[int]*Member) |
| return |
| } |
| c.distributePartitions() |
| } |
| |
| // LoadDistribution exposes load distribution of members. |
| func (c *Consistent) LoadDistribution() map[string]float64 { |
| c.mu.RLock() |
| defer c.mu.RUnlock() |
| |
| // Create a thread-safe copy |
| res := make(map[string]float64) |
| for member, load := range c.loads { |
| res[member] = load |
| } |
| return res |
| } |
| |
| // FindPartitionID returns partition id for given key. |
| func (c *Consistent) FindPartitionID(key []byte) int { |
| hkey := c.hasher.Sum64(key) |
| return int(hkey % c.partitionCount) |
| } |
| |
| // GetPartitionOwner returns the owner of the given partition. |
| func (c *Consistent) GetPartitionOwner(partID int) Member { |
| c.mu.RLock() |
| defer c.mu.RUnlock() |
| |
| member, ok := c.partitions[partID] |
| if !ok { |
| return nil |
| } |
| // Create a thread-safe copy of member and return it. |
| return *member |
| } |
| |
| // LocateKey finds a home for given key |
| func (c *Consistent) LocateKey(key []byte) Member { |
| partID := c.FindPartitionID(key) |
| return c.GetPartitionOwner(partID) |
| } |
| |
| func (c *Consistent) getClosestN(partID, count int) ([]Member, error) { |
| c.mu.RLock() |
| defer c.mu.RUnlock() |
| |
| res := []Member{} |
| if count > len(c.members) { |
| return res, ErrInsufficientMemberCount |
| } |
| |
| var ownerKey uint64 |
| owner := c.GetPartitionOwner(partID) |
| // Hash and sort all the names. |
| keys := []uint64{} |
| kmems := make(map[uint64]*Member) |
| for name, member := range c.members { |
| key := c.hasher.Sum64([]byte(name)) |
| if name == owner.String() { |
| ownerKey = key |
| } |
| keys = append(keys, key) |
| kmems[key] = member |
| } |
| sort.Slice(keys, func(i, j int) bool { |
| return keys[i] < keys[j] |
| }) |
| |
| // Find the key owner |
| idx := 0 |
| for idx < len(keys) { |
| if keys[idx] == ownerKey { |
| key := keys[idx] |
| res = append(res, *kmems[key]) |
| break |
| } |
| idx++ |
| } |
| |
| // Find the closest(replica owners) members. |
| for len(res) < count { |
| idx++ |
| if idx >= len(keys) { |
| idx = 0 |
| } |
| key := keys[idx] |
| res = append(res, *kmems[key]) |
| } |
| return res, nil |
| } |
| |
| // GetClosestN returns the closest N member to a key in the hash ring. |
| // This may be useful to find members for replication. |
| func (c *Consistent) GetClosestN(key []byte, count int) ([]Member, error) { |
| partID := c.FindPartitionID(key) |
| return c.getClosestN(partID, count) |
| } |
| |
| // GetClosestNForPartition returns the closest N member for given partition. |
| // This may be useful to find members for replication. |
| func (c *Consistent) GetClosestNForPartition(partID, count int) ([]Member, error) { |
| return c.getClosestN(partID, count) |
| } |