[VOL-2835] Using different topic per ONU device

Change-Id: I3e55064292f28f9bf39ad6bc75fd5758f5313317
diff --git a/vendor/github.com/buraksezer/consistent/.gitignore b/vendor/github.com/buraksezer/consistent/.gitignore
new file mode 100644
index 0000000..a1338d6
--- /dev/null
+++ b/vendor/github.com/buraksezer/consistent/.gitignore
@@ -0,0 +1,14 @@
+# Binaries for programs and plugins
+*.exe
+*.dll
+*.so
+*.dylib
+
+# Test binary, build with `go test -c`
+*.test
+
+# Output of the go coverage tool, specifically when used with LiteIDE
+*.out
+
+# Project-local glide cache, RE: https://github.com/Masterminds/glide/issues/736
+.glide/
diff --git a/vendor/github.com/buraksezer/consistent/.travis.yml b/vendor/github.com/buraksezer/consistent/.travis.yml
new file mode 100644
index 0000000..4f2ee4d
--- /dev/null
+++ b/vendor/github.com/buraksezer/consistent/.travis.yml
@@ -0,0 +1 @@
+language: go
diff --git a/vendor/github.com/buraksezer/consistent/LICENSE b/vendor/github.com/buraksezer/consistent/LICENSE
new file mode 100644
index 0000000..e470334
--- /dev/null
+++ b/vendor/github.com/buraksezer/consistent/LICENSE
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2018 Burak Sezer
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/vendor/github.com/buraksezer/consistent/README.md b/vendor/github.com/buraksezer/consistent/README.md
new file mode 100644
index 0000000..bde53d1
--- /dev/null
+++ b/vendor/github.com/buraksezer/consistent/README.md
@@ -0,0 +1,235 @@
+consistent
+==========
+[![GoDoc](http://img.shields.io/badge/godoc-reference-blue.svg?style=flat)](https://godoc.org/github.com/buraksezer/consistent) [![Build Status](https://travis-ci.org/buraksezer/consistent.svg?branch=master)](https://travis-ci.org/buraksezer/consistent) [![Coverage](http://gocover.io/_badge/github.com/buraksezer/consistent)](http://gocover.io/github.com/buraksezer/consistent) [![Go Report Card](https://goreportcard.com/badge/github.com/buraksezer/consistent)](https://goreportcard.com/report/github.com/buraksezer/consistent) [![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT) [![Mentioned in Awesome Go](https://awesome.re/mentioned-badge.svg)](https://github.com/avelino/awesome-go)  
+
+
+This library provides a consistent hashing function which simultaneously achieves both uniformity and consistency. 
+
+For detailed information about the concept, you should take a look at the following resources:
+
+* [Consistent Hashing with Bounded Loads on Google Research Blog](https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html)
+* [Improving load balancing with a new consistent-hashing algorithm on Vimeo Engineering Blog](https://medium.com/vimeo-engineering-blog/improving-load-balancing-with-a-new-consistent-hashing-algorithm-9f1bd75709ed)
+* [Consistent Hashing with Bounded Loads paper on arXiv](https://arxiv.org/abs/1608.01350)
+
+Table of Content
+----------------
+
+- [Overview](#overview)
+- [Install](#install)
+- [Configuration](#configuration)
+- [Usage](#usage)
+- [Benchmarks](#benchmarks)
+- [Examples](#examples)
+
+Overview
+--------
+
+In this package's context, the keys are distributed among partitions and partitions are distributed among members as well. 
+
+When you create a new consistent instance or call `Add/Remove`:
+
+* The member's name is hashed and inserted into the hash ring,
+* Average load is calculated by the algorithm defined in the paper,
+* Partitions are distributed among members by hashing partition IDs and none of them exceed the average load.
+
+Average load cannot be exceeded. So if all members are loaded at the maximum while trying to add a new member, it panics.
+
+When you want to locate a key by calling `LocateKey`:
+
+* The key(byte slice) is hashed,
+* The result of the hash is mod by the number of partitions,
+* The result of this modulo - `MOD(hash result, partition count)` - is the partition in which the key will be located,
+* Owner of the partition is already determined before calling `LocateKey`. So it returns the partition owner immediately.
+
+No memory is allocated by `consistent` except hashing when you want to locate a key.
+
+Note that the number of partitions cannot be changed after creation. 
+
+Install
+-------
+
+With a correctly configured Go environment:
+
+```
+go get github.com/buraksezer/consistent
+```
+
+You will find some useful usage samples in [examples](https://github.com/buraksezer/consistent/tree/master/_examples) folder.
+
+Configuration
+-------------
+
+```go
+type Config struct {
+	// Hasher is responsible for generating unsigned, 64 bit hash of provided byte slice.
+	Hasher Hasher
+
+	// Keys are distributed among partitions. Prime numbers are good to
+	// distribute keys uniformly. Select a big PartitionCount if you have
+	// too many keys.
+	PartitionCount int
+
+	// Members are replicated on consistent hash ring. This number controls
+	// the number each member is replicated on the ring.
+	ReplicationFactor int
+
+	// Load is used to calculate average load. See the code, the paper and Google's 
+	// blog post to learn about it.
+	Load float64
+}
+```
+
+Any hash algorithm can be used as hasher which implements Hasher interface. Please take a look at the *Sample* section for an example.
+
+Usage
+-----
+
+`LocateKey` function finds a member in the cluster for your key:
+```go
+// With a properly configured and initialized consistent instance
+key := []byte("my-key")
+member := c.LocateKey(key)
+```
+It returns a thread-safe copy of the member you added before.
+
+The second most frequently used function is `GetClosestN`. 
+
+```go
+// With a properly configured and initialized consistent instance
+
+key := []byte("my-key")
+members, err := c.GetClosestN(key, 2)
+```
+
+This may be useful to find backup nodes to store your key.
+
+Benchmarks
+----------
+On an early 2015 Macbook:
+
+```
+BenchmarkAddRemove-4     	  100000	     22006 ns/op
+BenchmarkLocateKey-4     	 5000000	       252 ns/op
+BenchmarkGetClosestN-4   	  500000	      2974 ns/op
+```
+
+Examples
+--------
+
+The most basic use of consistent package should be like this. For detailed list of functions, [visit godoc.org.](https://godoc.org/github.com/buraksezer/consistent)
+More sample code can be found under [_examples](https://github.com/buraksezer/consistent/tree/master/_examples).
+
+```go
+package main
+
+import (
+	"fmt"
+
+	"github.com/buraksezer/consistent"
+	"github.com/cespare/xxhash"
+)
+
+// In your code, you probably have a custom data type 
+// for your cluster members. Just add a String function to implement 
+// consistent.Member interface.
+type myMember string
+
+func (m myMember) String() string {
+	return string(m)
+}
+
+// consistent package doesn't provide a default hashing function. 
+// You should provide a proper one to distribute keys/members uniformly.
+type hasher struct{}
+
+func (h hasher) Sum64(data []byte) uint64 {
+	// you should use a proper hash function for uniformity.
+	return xxhash.Sum64(data)
+}
+
+func main() {
+	// Create a new consistent instance
+	cfg := consistent.Config{
+		PartitionCount:    7,
+		ReplicationFactor: 20,
+		Load:              1.25,
+		Hasher:            hasher{},
+	}
+	c := consistent.New(nil, cfg)
+
+	// Add some members to the consistent hash table.
+	// Add function calculates average load and distributes partitions over members
+	node1 := myMember("node1.olric.com")
+	c.Add(node1)
+
+	node2 := myMember("node2.olric.com")
+	c.Add(node2)
+
+	key := []byte("my-key")
+	// calculates partition id for the given key
+	// partID := hash(key) % partitionCount
+	// the partitions are already distributed among members by Add function.
+	owner := c.LocateKey(key)
+	fmt.Println(owner.String())
+	// Prints node2.olric.com
+}
+```
+
+Another useful example is `_examples/relocation_percentage.go`. It creates a `consistent` object with 8 members and distributes partitions among them. Then adds 9th member, 
+here is the result with a proper configuration and hash function:
+
+```
+bloom:consistent burak$ go run _examples/relocation_percentage.go
+partID: 218 moved to node2.olric from node0.olric
+partID: 173 moved to node9.olric from node3.olric
+partID: 225 moved to node7.olric from node0.olric
+partID:  85 moved to node9.olric from node7.olric
+partID: 220 moved to node5.olric from node0.olric
+partID:  33 moved to node9.olric from node5.olric
+partID: 254 moved to node9.olric from node4.olric
+partID:  71 moved to node9.olric from node3.olric
+partID: 236 moved to node9.olric from node2.olric
+partID: 118 moved to node9.olric from node3.olric
+partID: 233 moved to node3.olric from node0.olric
+partID:  50 moved to node9.olric from node4.olric
+partID: 252 moved to node9.olric from node2.olric
+partID: 121 moved to node9.olric from node2.olric
+partID: 259 moved to node9.olric from node4.olric
+partID:  92 moved to node9.olric from node7.olric
+partID: 152 moved to node9.olric from node3.olric
+partID: 105 moved to node9.olric from node2.olric
+
+6% of the partitions are relocated
+```
+
+Moved partition count is highly dependent on your configuration and quailty of hash function. You should modify the configuration to find an optimum set of configurations
+for your system.
+
+`_examples/load_distribution.go` is also useful to understand load distribution. It creates a `consistent` object with 8 members and locates 1M key. It also calculates average 
+load which cannot be exceeded by any member. Here is the result:
+
+```
+Maximum key count for a member should be around this:  147602
+member: node2.olric, key count: 100362
+member: node5.olric, key count: 99448
+member: node0.olric, key count: 147735
+member: node3.olric, key count: 103455
+member: node6.olric, key count: 147069
+member: node1.olric, key count: 121566
+member: node4.olric, key count: 147932
+member: node7.olric, key count: 132433
+```
+
+Average load can be calculated by using the following formula:
+
+```
+load := (consistent.AverageLoad() * float64(keyCount)) / float64(config.PartitionCount)
+```
+
+Contributions
+-------------
+Please don't hesitate to fork the project and send a pull request or just e-mail me to ask questions and share ideas.
+
+License
+-------
+MIT License, - see LICENSE for more details.
diff --git a/vendor/github.com/buraksezer/consistent/consistent.go b/vendor/github.com/buraksezer/consistent/consistent.go
new file mode 100644
index 0000000..a1446d6
--- /dev/null
+++ b/vendor/github.com/buraksezer/consistent/consistent.go
@@ -0,0 +1,362 @@
+// Copyright (c) 2018 Burak Sezer
+// All rights reserved.
+//
+// This code is licensed under the MIT License.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files(the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions :
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+// Package consistent provides a consistent hashing function with bounded loads.
+// For more information about the underlying algorithm, please take a look at
+// https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html
+//
+// Example Use:
+// 	cfg := consistent.Config{
+// 		PartitionCount:    71,
+// 		ReplicationFactor: 20,
+// 		Load:              1.25,
+// 		Hasher:            hasher{},
+//	}
+//
+//      // Create a new consistent object
+//      // You may call this with a list of members
+//      // instead of adding them one by one.
+//	c := consistent.New(members, cfg)
+//
+//      // myMember struct just needs to implement a String method.
+//      // New/Add/Remove distributes partitions among members using the algorithm
+//      // defined on Google Research Blog.
+//	c.Add(myMember)
+//
+//	key := []byte("my-key")
+//      // LocateKey hashes the key and calculates partition ID with
+//      // this modulo operation: MOD(hash result, partition count)
+//      // The owner of the partition is already calculated by New/Add/Remove.
+//      // LocateKey just returns the member which's responsible for the key.
+//	member := c.LocateKey(key)
+//
+package consistent
+
+import (
+	"encoding/binary"
+	"errors"
+	"fmt"
+	"math"
+	"sort"
+	"sync"
+)
+
+var (
+	//ErrInsufficientMemberCount represents an error which means there are not enough members to complete the task.
+	ErrInsufficientMemberCount = errors.New("insufficient member count")
+
+	// ErrMemberNotFound represents an error which means requested member could not be found in consistent hash ring.
+	ErrMemberNotFound = errors.New("member could not be found in ring")
+)
+
+// Hasher is responsible for generating unsigned, 64 bit hash of provided byte slice.
+// Hasher should minimize collisions (generating same hash for different byte slice)
+// and while performance is also important fast functions are preferable (i.e.
+// you can use FarmHash family).
+type Hasher interface {
+	Sum64([]byte) uint64
+}
+
+// Member interface represents a member in consistent hash ring.
+type Member interface {
+	String() string
+}
+
+// Config represents a structure to control consistent package.
+type Config struct {
+	// Hasher is responsible for generating unsigned, 64 bit hash of provided byte slice.
+	Hasher Hasher
+
+	// Keys are distributed among partitions. Prime numbers are good to
+	// distribute keys uniformly. Select a big PartitionCount if you have
+	// too many keys.
+	PartitionCount int
+
+	// Members are replicated on consistent hash ring. This number means that a member
+	// how many times replicated on the ring.
+	ReplicationFactor int
+
+	// Load is used to calculate average load. See the code, the paper and Google's blog post to learn about it.
+	Load float64
+}
+
+// Consistent holds the information about the members of the consistent hash circle.
+type Consistent struct {
+	mu sync.RWMutex
+
+	config         Config
+	hasher         Hasher
+	sortedSet      []uint64
+	partitionCount uint64
+	loads          map[string]float64
+	members        map[string]*Member
+	partitions     map[int]*Member
+	ring           map[uint64]*Member
+}
+
+// New creates and returns a new Consistent object.
+func New(members []Member, config Config) *Consistent {
+	c := &Consistent{
+		config:         config,
+		members:        make(map[string]*Member),
+		partitionCount: uint64(config.PartitionCount),
+		ring:           make(map[uint64]*Member),
+	}
+	if config.Hasher == nil {
+		panic("Hasher cannot be nil")
+	}
+	// TODO: Check configuration here
+	c.hasher = config.Hasher
+	for _, member := range members {
+		c.add(member)
+	}
+	if members != nil {
+		c.distributePartitions()
+	}
+	return c
+}
+
+// GetMembers returns a thread-safe copy of members.
+func (c *Consistent) GetMembers() []Member {
+	c.mu.RLock()
+	defer c.mu.RUnlock()
+
+	// Create a thread-safe copy of member list.
+	members := make([]Member, 0, len(c.members))
+	for _, member := range c.members {
+		members = append(members, *member)
+	}
+	return members
+}
+
+// AverageLoad exposes the current average load.
+func (c *Consistent) AverageLoad() float64 {
+	avgLoad := float64(c.partitionCount/uint64(len(c.members))) * c.config.Load
+	return math.Ceil(avgLoad)
+}
+
+func (c *Consistent) distributeWithLoad(partID, idx int, partitions map[int]*Member, loads map[string]float64) {
+	avgLoad := c.AverageLoad()
+	var count int
+	for {
+		count++
+		if count >= len(c.sortedSet) {
+			// User needs to decrease partition count, increase member count or increase load factor.
+			panic("not enough room to distribute partitions")
+		}
+		i := c.sortedSet[idx]
+		member := *c.ring[i]
+		load := loads[member.String()]
+		if load+1 <= avgLoad {
+			partitions[partID] = &member
+			loads[member.String()]++
+			return
+		}
+		idx++
+		if idx >= len(c.sortedSet) {
+			idx = 0
+		}
+	}
+}
+
+func (c *Consistent) distributePartitions() {
+	loads := make(map[string]float64)
+	partitions := make(map[int]*Member)
+
+	bs := make([]byte, 8)
+	for partID := uint64(0); partID < c.partitionCount; partID++ {
+		binary.LittleEndian.PutUint64(bs, partID)
+		key := c.hasher.Sum64(bs)
+		idx := sort.Search(len(c.sortedSet), func(i int) bool {
+			return c.sortedSet[i] >= key
+		})
+		if idx >= len(c.sortedSet) {
+			idx = 0
+		}
+		c.distributeWithLoad(int(partID), idx, partitions, loads)
+	}
+	c.partitions = partitions
+	c.loads = loads
+}
+
+func (c *Consistent) add(member Member) {
+	for i := 0; i < c.config.ReplicationFactor; i++ {
+		key := []byte(fmt.Sprintf("%s%d", member.String(), i))
+		h := c.hasher.Sum64(key)
+		c.ring[h] = &member
+		c.sortedSet = append(c.sortedSet, h)
+	}
+	// sort hashes ascendingly
+	sort.Slice(c.sortedSet, func(i int, j int) bool {
+		return c.sortedSet[i] < c.sortedSet[j]
+	})
+	// Storing member at this map is useful to find backup members of a partition.
+	c.members[member.String()] = &member
+}
+
+// Add adds a new member to the consistent hash circle.
+func (c *Consistent) Add(member Member) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	if _, ok := c.members[member.String()]; ok {
+		// We already have this member. Quit immediately.
+		return
+	}
+	c.add(member)
+	c.distributePartitions()
+}
+
+func (c *Consistent) delSlice(val uint64) {
+	for i := 0; i < len(c.sortedSet); i++ {
+		if c.sortedSet[i] == val {
+			c.sortedSet = append(c.sortedSet[:i], c.sortedSet[i+1:]...)
+			break
+		}
+	}
+}
+
+// Remove removes a member from the consistent hash circle.
+func (c *Consistent) Remove(name string) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	if _, ok := c.members[name]; !ok {
+		// There is no member with that name. Quit immediately.
+		return
+	}
+
+	for i := 0; i < c.config.ReplicationFactor; i++ {
+		key := []byte(fmt.Sprintf("%s%d", name, i))
+		h := c.hasher.Sum64(key)
+		delete(c.ring, h)
+		c.delSlice(h)
+	}
+	delete(c.members, name)
+	if len(c.members) == 0 {
+		// consistent hash ring is empty now. Reset the partition table.
+		c.partitions = make(map[int]*Member)
+		return
+	}
+	c.distributePartitions()
+}
+
+// LoadDistribution exposes load distribution of members.
+func (c *Consistent) LoadDistribution() map[string]float64 {
+	c.mu.RLock()
+	defer c.mu.RUnlock()
+
+	// Create a thread-safe copy
+	res := make(map[string]float64)
+	for member, load := range c.loads {
+		res[member] = load
+	}
+	return res
+}
+
+// FindPartitionID returns partition id for given key.
+func (c *Consistent) FindPartitionID(key []byte) int {
+	hkey := c.hasher.Sum64(key)
+	return int(hkey % c.partitionCount)
+}
+
+// GetPartitionOwner returns the owner of the given partition.
+func (c *Consistent) GetPartitionOwner(partID int) Member {
+	c.mu.RLock()
+	defer c.mu.RUnlock()
+
+	member, ok := c.partitions[partID]
+	if !ok {
+		return nil
+	}
+	// Create a thread-safe copy of member and return it.
+	return *member
+}
+
+// LocateKey finds a home for given key
+func (c *Consistent) LocateKey(key []byte) Member {
+	partID := c.FindPartitionID(key)
+	return c.GetPartitionOwner(partID)
+}
+
+func (c *Consistent) getClosestN(partID, count int) ([]Member, error) {
+	c.mu.RLock()
+	defer c.mu.RUnlock()
+
+	res := []Member{}
+	if count > len(c.members) {
+		return res, ErrInsufficientMemberCount
+	}
+
+	var ownerKey uint64
+	owner := c.GetPartitionOwner(partID)
+	// Hash and sort all the names.
+	keys := []uint64{}
+	kmems := make(map[uint64]*Member)
+	for name, member := range c.members {
+		key := c.hasher.Sum64([]byte(name))
+		if name == owner.String() {
+			ownerKey = key
+		}
+		keys = append(keys, key)
+		kmems[key] = member
+	}
+	sort.Slice(keys, func(i, j int) bool {
+		return keys[i] < keys[j]
+	})
+
+	// Find the key owner
+	idx := 0
+	for idx < len(keys) {
+		if keys[idx] == ownerKey {
+			key := keys[idx]
+			res = append(res, *kmems[key])
+			break
+		}
+		idx++
+	}
+
+	// Find the closest(replica owners) members.
+	for len(res) < count {
+		idx++
+		if idx >= len(keys) {
+			idx = 0
+		}
+		key := keys[idx]
+		res = append(res, *kmems[key])
+	}
+	return res, nil
+}
+
+// GetClosestN returns the closest N member to a key in the hash ring.
+// This may be useful to find members for replication.
+func (c *Consistent) GetClosestN(key []byte, count int) ([]Member, error) {
+	partID := c.FindPartitionID(key)
+	return c.getClosestN(partID, count)
+}
+
+// GetClosestNForPartition returns the closest N member for given partition.
+// This may be useful to find members for replication.
+func (c *Consistent) GetClosestNForPartition(partID, count int) ([]Member, error) {
+	return c.getClosestN(partID, count)
+}