[VOL-2835] Using different topic per ONU device
Change-Id: I3e55064292f28f9bf39ad6bc75fd5758f5313317
diff --git a/vendor/github.com/buraksezer/consistent/.gitignore b/vendor/github.com/buraksezer/consistent/.gitignore
new file mode 100644
index 0000000..a1338d6
--- /dev/null
+++ b/vendor/github.com/buraksezer/consistent/.gitignore
@@ -0,0 +1,14 @@
+# Binaries for programs and plugins
+*.exe
+*.dll
+*.so
+*.dylib
+
+# Test binary, build with `go test -c`
+*.test
+
+# Output of the go coverage tool, specifically when used with LiteIDE
+*.out
+
+# Project-local glide cache, RE: https://github.com/Masterminds/glide/issues/736
+.glide/
diff --git a/vendor/github.com/buraksezer/consistent/.travis.yml b/vendor/github.com/buraksezer/consistent/.travis.yml
new file mode 100644
index 0000000..4f2ee4d
--- /dev/null
+++ b/vendor/github.com/buraksezer/consistent/.travis.yml
@@ -0,0 +1 @@
+language: go
diff --git a/vendor/github.com/buraksezer/consistent/LICENSE b/vendor/github.com/buraksezer/consistent/LICENSE
new file mode 100644
index 0000000..e470334
--- /dev/null
+++ b/vendor/github.com/buraksezer/consistent/LICENSE
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2018 Burak Sezer
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/vendor/github.com/buraksezer/consistent/README.md b/vendor/github.com/buraksezer/consistent/README.md
new file mode 100644
index 0000000..bde53d1
--- /dev/null
+++ b/vendor/github.com/buraksezer/consistent/README.md
@@ -0,0 +1,235 @@
+consistent
+==========
+[![GoDoc](http://img.shields.io/badge/godoc-reference-blue.svg?style=flat)](https://godoc.org/github.com/buraksezer/consistent) [![Build Status](https://travis-ci.org/buraksezer/consistent.svg?branch=master)](https://travis-ci.org/buraksezer/consistent) [![Coverage](http://gocover.io/_badge/github.com/buraksezer/consistent)](http://gocover.io/github.com/buraksezer/consistent) [![Go Report Card](https://goreportcard.com/badge/github.com/buraksezer/consistent)](https://goreportcard.com/report/github.com/buraksezer/consistent) [![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT) [![Mentioned in Awesome Go](https://awesome.re/mentioned-badge.svg)](https://github.com/avelino/awesome-go)
+
+
+This library provides a consistent hashing function which simultaneously achieves both uniformity and consistency.
+
+For detailed information about the concept, you should take a look at the following resources:
+
+* [Consistent Hashing with Bounded Loads on Google Research Blog](https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html)
+* [Improving load balancing with a new consistent-hashing algorithm on Vimeo Engineering Blog](https://medium.com/vimeo-engineering-blog/improving-load-balancing-with-a-new-consistent-hashing-algorithm-9f1bd75709ed)
+* [Consistent Hashing with Bounded Loads paper on arXiv](https://arxiv.org/abs/1608.01350)
+
+Table of Content
+----------------
+
+- [Overview](#overview)
+- [Install](#install)
+- [Configuration](#configuration)
+- [Usage](#usage)
+- [Benchmarks](#benchmarks)
+- [Examples](#examples)
+
+Overview
+--------
+
+In this package's context, the keys are distributed among partitions and partitions are distributed among members as well.
+
+When you create a new consistent instance or call `Add/Remove`:
+
+* The member's name is hashed and inserted into the hash ring,
+* Average load is calculated by the algorithm defined in the paper,
+* Partitions are distributed among members by hashing partition IDs and none of them exceed the average load.
+
+Average load cannot be exceeded. So if all members are loaded at the maximum while trying to add a new member, it panics.
+
+When you want to locate a key by calling `LocateKey`:
+
+* The key(byte slice) is hashed,
+* The result of the hash is mod by the number of partitions,
+* The result of this modulo - `MOD(hash result, partition count)` - is the partition in which the key will be located,
+* Owner of the partition is already determined before calling `LocateKey`. So it returns the partition owner immediately.
+
+No memory is allocated by `consistent` except hashing when you want to locate a key.
+
+Note that the number of partitions cannot be changed after creation.
+
+Install
+-------
+
+With a correctly configured Go environment:
+
+```
+go get github.com/buraksezer/consistent
+```
+
+You will find some useful usage samples in [examples](https://github.com/buraksezer/consistent/tree/master/_examples) folder.
+
+Configuration
+-------------
+
+```go
+type Config struct {
+ // Hasher is responsible for generating unsigned, 64 bit hash of provided byte slice.
+ Hasher Hasher
+
+ // Keys are distributed among partitions. Prime numbers are good to
+ // distribute keys uniformly. Select a big PartitionCount if you have
+ // too many keys.
+ PartitionCount int
+
+ // Members are replicated on consistent hash ring. This number controls
+ // the number each member is replicated on the ring.
+ ReplicationFactor int
+
+ // Load is used to calculate average load. See the code, the paper and Google's
+ // blog post to learn about it.
+ Load float64
+}
+```
+
+Any hash algorithm can be used as hasher which implements Hasher interface. Please take a look at the *Sample* section for an example.
+
+Usage
+-----
+
+`LocateKey` function finds a member in the cluster for your key:
+```go
+// With a properly configured and initialized consistent instance
+key := []byte("my-key")
+member := c.LocateKey(key)
+```
+It returns a thread-safe copy of the member you added before.
+
+The second most frequently used function is `GetClosestN`.
+
+```go
+// With a properly configured and initialized consistent instance
+
+key := []byte("my-key")
+members, err := c.GetClosestN(key, 2)
+```
+
+This may be useful to find backup nodes to store your key.
+
+Benchmarks
+----------
+On an early 2015 Macbook:
+
+```
+BenchmarkAddRemove-4 100000 22006 ns/op
+BenchmarkLocateKey-4 5000000 252 ns/op
+BenchmarkGetClosestN-4 500000 2974 ns/op
+```
+
+Examples
+--------
+
+The most basic use of consistent package should be like this. For detailed list of functions, [visit godoc.org.](https://godoc.org/github.com/buraksezer/consistent)
+More sample code can be found under [_examples](https://github.com/buraksezer/consistent/tree/master/_examples).
+
+```go
+package main
+
+import (
+ "fmt"
+
+ "github.com/buraksezer/consistent"
+ "github.com/cespare/xxhash"
+)
+
+// In your code, you probably have a custom data type
+// for your cluster members. Just add a String function to implement
+// consistent.Member interface.
+type myMember string
+
+func (m myMember) String() string {
+ return string(m)
+}
+
+// consistent package doesn't provide a default hashing function.
+// You should provide a proper one to distribute keys/members uniformly.
+type hasher struct{}
+
+func (h hasher) Sum64(data []byte) uint64 {
+ // you should use a proper hash function for uniformity.
+ return xxhash.Sum64(data)
+}
+
+func main() {
+ // Create a new consistent instance
+ cfg := consistent.Config{
+ PartitionCount: 7,
+ ReplicationFactor: 20,
+ Load: 1.25,
+ Hasher: hasher{},
+ }
+ c := consistent.New(nil, cfg)
+
+ // Add some members to the consistent hash table.
+ // Add function calculates average load and distributes partitions over members
+ node1 := myMember("node1.olric.com")
+ c.Add(node1)
+
+ node2 := myMember("node2.olric.com")
+ c.Add(node2)
+
+ key := []byte("my-key")
+ // calculates partition id for the given key
+ // partID := hash(key) % partitionCount
+ // the partitions are already distributed among members by Add function.
+ owner := c.LocateKey(key)
+ fmt.Println(owner.String())
+ // Prints node2.olric.com
+}
+```
+
+Another useful example is `_examples/relocation_percentage.go`. It creates a `consistent` object with 8 members and distributes partitions among them. Then adds 9th member,
+here is the result with a proper configuration and hash function:
+
+```
+bloom:consistent burak$ go run _examples/relocation_percentage.go
+partID: 218 moved to node2.olric from node0.olric
+partID: 173 moved to node9.olric from node3.olric
+partID: 225 moved to node7.olric from node0.olric
+partID: 85 moved to node9.olric from node7.olric
+partID: 220 moved to node5.olric from node0.olric
+partID: 33 moved to node9.olric from node5.olric
+partID: 254 moved to node9.olric from node4.olric
+partID: 71 moved to node9.olric from node3.olric
+partID: 236 moved to node9.olric from node2.olric
+partID: 118 moved to node9.olric from node3.olric
+partID: 233 moved to node3.olric from node0.olric
+partID: 50 moved to node9.olric from node4.olric
+partID: 252 moved to node9.olric from node2.olric
+partID: 121 moved to node9.olric from node2.olric
+partID: 259 moved to node9.olric from node4.olric
+partID: 92 moved to node9.olric from node7.olric
+partID: 152 moved to node9.olric from node3.olric
+partID: 105 moved to node9.olric from node2.olric
+
+6% of the partitions are relocated
+```
+
+Moved partition count is highly dependent on your configuration and quailty of hash function. You should modify the configuration to find an optimum set of configurations
+for your system.
+
+`_examples/load_distribution.go` is also useful to understand load distribution. It creates a `consistent` object with 8 members and locates 1M key. It also calculates average
+load which cannot be exceeded by any member. Here is the result:
+
+```
+Maximum key count for a member should be around this: 147602
+member: node2.olric, key count: 100362
+member: node5.olric, key count: 99448
+member: node0.olric, key count: 147735
+member: node3.olric, key count: 103455
+member: node6.olric, key count: 147069
+member: node1.olric, key count: 121566
+member: node4.olric, key count: 147932
+member: node7.olric, key count: 132433
+```
+
+Average load can be calculated by using the following formula:
+
+```
+load := (consistent.AverageLoad() * float64(keyCount)) / float64(config.PartitionCount)
+```
+
+Contributions
+-------------
+Please don't hesitate to fork the project and send a pull request or just e-mail me to ask questions and share ideas.
+
+License
+-------
+MIT License, - see LICENSE for more details.
diff --git a/vendor/github.com/buraksezer/consistent/consistent.go b/vendor/github.com/buraksezer/consistent/consistent.go
new file mode 100644
index 0000000..a1446d6
--- /dev/null
+++ b/vendor/github.com/buraksezer/consistent/consistent.go
@@ -0,0 +1,362 @@
+// Copyright (c) 2018 Burak Sezer
+// All rights reserved.
+//
+// This code is licensed under the MIT License.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files(the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions :
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+// Package consistent provides a consistent hashing function with bounded loads.
+// For more information about the underlying algorithm, please take a look at
+// https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html
+//
+// Example Use:
+// cfg := consistent.Config{
+// PartitionCount: 71,
+// ReplicationFactor: 20,
+// Load: 1.25,
+// Hasher: hasher{},
+// }
+//
+// // Create a new consistent object
+// // You may call this with a list of members
+// // instead of adding them one by one.
+// c := consistent.New(members, cfg)
+//
+// // myMember struct just needs to implement a String method.
+// // New/Add/Remove distributes partitions among members using the algorithm
+// // defined on Google Research Blog.
+// c.Add(myMember)
+//
+// key := []byte("my-key")
+// // LocateKey hashes the key and calculates partition ID with
+// // this modulo operation: MOD(hash result, partition count)
+// // The owner of the partition is already calculated by New/Add/Remove.
+// // LocateKey just returns the member which's responsible for the key.
+// member := c.LocateKey(key)
+//
+package consistent
+
+import (
+ "encoding/binary"
+ "errors"
+ "fmt"
+ "math"
+ "sort"
+ "sync"
+)
+
+var (
+ //ErrInsufficientMemberCount represents an error which means there are not enough members to complete the task.
+ ErrInsufficientMemberCount = errors.New("insufficient member count")
+
+ // ErrMemberNotFound represents an error which means requested member could not be found in consistent hash ring.
+ ErrMemberNotFound = errors.New("member could not be found in ring")
+)
+
+// Hasher is responsible for generating unsigned, 64 bit hash of provided byte slice.
+// Hasher should minimize collisions (generating same hash for different byte slice)
+// and while performance is also important fast functions are preferable (i.e.
+// you can use FarmHash family).
+type Hasher interface {
+ Sum64([]byte) uint64
+}
+
+// Member interface represents a member in consistent hash ring.
+type Member interface {
+ String() string
+}
+
+// Config represents a structure to control consistent package.
+type Config struct {
+ // Hasher is responsible for generating unsigned, 64 bit hash of provided byte slice.
+ Hasher Hasher
+
+ // Keys are distributed among partitions. Prime numbers are good to
+ // distribute keys uniformly. Select a big PartitionCount if you have
+ // too many keys.
+ PartitionCount int
+
+ // Members are replicated on consistent hash ring. This number means that a member
+ // how many times replicated on the ring.
+ ReplicationFactor int
+
+ // Load is used to calculate average load. See the code, the paper and Google's blog post to learn about it.
+ Load float64
+}
+
+// Consistent holds the information about the members of the consistent hash circle.
+type Consistent struct {
+ mu sync.RWMutex
+
+ config Config
+ hasher Hasher
+ sortedSet []uint64
+ partitionCount uint64
+ loads map[string]float64
+ members map[string]*Member
+ partitions map[int]*Member
+ ring map[uint64]*Member
+}
+
+// New creates and returns a new Consistent object.
+func New(members []Member, config Config) *Consistent {
+ c := &Consistent{
+ config: config,
+ members: make(map[string]*Member),
+ partitionCount: uint64(config.PartitionCount),
+ ring: make(map[uint64]*Member),
+ }
+ if config.Hasher == nil {
+ panic("Hasher cannot be nil")
+ }
+ // TODO: Check configuration here
+ c.hasher = config.Hasher
+ for _, member := range members {
+ c.add(member)
+ }
+ if members != nil {
+ c.distributePartitions()
+ }
+ return c
+}
+
+// GetMembers returns a thread-safe copy of members.
+func (c *Consistent) GetMembers() []Member {
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+
+ // Create a thread-safe copy of member list.
+ members := make([]Member, 0, len(c.members))
+ for _, member := range c.members {
+ members = append(members, *member)
+ }
+ return members
+}
+
+// AverageLoad exposes the current average load.
+func (c *Consistent) AverageLoad() float64 {
+ avgLoad := float64(c.partitionCount/uint64(len(c.members))) * c.config.Load
+ return math.Ceil(avgLoad)
+}
+
+func (c *Consistent) distributeWithLoad(partID, idx int, partitions map[int]*Member, loads map[string]float64) {
+ avgLoad := c.AverageLoad()
+ var count int
+ for {
+ count++
+ if count >= len(c.sortedSet) {
+ // User needs to decrease partition count, increase member count or increase load factor.
+ panic("not enough room to distribute partitions")
+ }
+ i := c.sortedSet[idx]
+ member := *c.ring[i]
+ load := loads[member.String()]
+ if load+1 <= avgLoad {
+ partitions[partID] = &member
+ loads[member.String()]++
+ return
+ }
+ idx++
+ if idx >= len(c.sortedSet) {
+ idx = 0
+ }
+ }
+}
+
+func (c *Consistent) distributePartitions() {
+ loads := make(map[string]float64)
+ partitions := make(map[int]*Member)
+
+ bs := make([]byte, 8)
+ for partID := uint64(0); partID < c.partitionCount; partID++ {
+ binary.LittleEndian.PutUint64(bs, partID)
+ key := c.hasher.Sum64(bs)
+ idx := sort.Search(len(c.sortedSet), func(i int) bool {
+ return c.sortedSet[i] >= key
+ })
+ if idx >= len(c.sortedSet) {
+ idx = 0
+ }
+ c.distributeWithLoad(int(partID), idx, partitions, loads)
+ }
+ c.partitions = partitions
+ c.loads = loads
+}
+
+func (c *Consistent) add(member Member) {
+ for i := 0; i < c.config.ReplicationFactor; i++ {
+ key := []byte(fmt.Sprintf("%s%d", member.String(), i))
+ h := c.hasher.Sum64(key)
+ c.ring[h] = &member
+ c.sortedSet = append(c.sortedSet, h)
+ }
+ // sort hashes ascendingly
+ sort.Slice(c.sortedSet, func(i int, j int) bool {
+ return c.sortedSet[i] < c.sortedSet[j]
+ })
+ // Storing member at this map is useful to find backup members of a partition.
+ c.members[member.String()] = &member
+}
+
+// Add adds a new member to the consistent hash circle.
+func (c *Consistent) Add(member Member) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ if _, ok := c.members[member.String()]; ok {
+ // We already have this member. Quit immediately.
+ return
+ }
+ c.add(member)
+ c.distributePartitions()
+}
+
+func (c *Consistent) delSlice(val uint64) {
+ for i := 0; i < len(c.sortedSet); i++ {
+ if c.sortedSet[i] == val {
+ c.sortedSet = append(c.sortedSet[:i], c.sortedSet[i+1:]...)
+ break
+ }
+ }
+}
+
+// Remove removes a member from the consistent hash circle.
+func (c *Consistent) Remove(name string) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ if _, ok := c.members[name]; !ok {
+ // There is no member with that name. Quit immediately.
+ return
+ }
+
+ for i := 0; i < c.config.ReplicationFactor; i++ {
+ key := []byte(fmt.Sprintf("%s%d", name, i))
+ h := c.hasher.Sum64(key)
+ delete(c.ring, h)
+ c.delSlice(h)
+ }
+ delete(c.members, name)
+ if len(c.members) == 0 {
+ // consistent hash ring is empty now. Reset the partition table.
+ c.partitions = make(map[int]*Member)
+ return
+ }
+ c.distributePartitions()
+}
+
+// LoadDistribution exposes load distribution of members.
+func (c *Consistent) LoadDistribution() map[string]float64 {
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+
+ // Create a thread-safe copy
+ res := make(map[string]float64)
+ for member, load := range c.loads {
+ res[member] = load
+ }
+ return res
+}
+
+// FindPartitionID returns partition id for given key.
+func (c *Consistent) FindPartitionID(key []byte) int {
+ hkey := c.hasher.Sum64(key)
+ return int(hkey % c.partitionCount)
+}
+
+// GetPartitionOwner returns the owner of the given partition.
+func (c *Consistent) GetPartitionOwner(partID int) Member {
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+
+ member, ok := c.partitions[partID]
+ if !ok {
+ return nil
+ }
+ // Create a thread-safe copy of member and return it.
+ return *member
+}
+
+// LocateKey finds a home for given key
+func (c *Consistent) LocateKey(key []byte) Member {
+ partID := c.FindPartitionID(key)
+ return c.GetPartitionOwner(partID)
+}
+
+func (c *Consistent) getClosestN(partID, count int) ([]Member, error) {
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+
+ res := []Member{}
+ if count > len(c.members) {
+ return res, ErrInsufficientMemberCount
+ }
+
+ var ownerKey uint64
+ owner := c.GetPartitionOwner(partID)
+ // Hash and sort all the names.
+ keys := []uint64{}
+ kmems := make(map[uint64]*Member)
+ for name, member := range c.members {
+ key := c.hasher.Sum64([]byte(name))
+ if name == owner.String() {
+ ownerKey = key
+ }
+ keys = append(keys, key)
+ kmems[key] = member
+ }
+ sort.Slice(keys, func(i, j int) bool {
+ return keys[i] < keys[j]
+ })
+
+ // Find the key owner
+ idx := 0
+ for idx < len(keys) {
+ if keys[idx] == ownerKey {
+ key := keys[idx]
+ res = append(res, *kmems[key])
+ break
+ }
+ idx++
+ }
+
+ // Find the closest(replica owners) members.
+ for len(res) < count {
+ idx++
+ if idx >= len(keys) {
+ idx = 0
+ }
+ key := keys[idx]
+ res = append(res, *kmems[key])
+ }
+ return res, nil
+}
+
+// GetClosestN returns the closest N member to a key in the hash ring.
+// This may be useful to find members for replication.
+func (c *Consistent) GetClosestN(key []byte, count int) ([]Member, error) {
+ partID := c.FindPartitionID(key)
+ return c.getClosestN(partID, count)
+}
+
+// GetClosestNForPartition returns the closest N member for given partition.
+// This may be useful to find members for replication.
+func (c *Consistent) GetClosestNForPartition(partID, count int) ([]Member, error) {
+ return c.getClosestN(partID, count)
+}