[VOL-2836] Using different topic per ONU device

+# Binaries for programs and plugins
+# Test binary, build with `go test -c`
+# Output of the go coverage tool, specifically when used with LiteIDE
+# Project-local glide cache, RE: https://github.com/Masterminds/glide/issues/736
+MIT License
+Copyright (c) 2018 Burak Sezer
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
[![GoDoc](http://img.shields.io/badge/godoc-reference-blue.svg?style=flat)](https://godoc.org/github.com/buraksezer/consistent) [![Build Status](https://travis-ci.org/buraksezer/consistent.svg?branch=master)](https://travis-ci.org/buraksezer/consistent) [![Coverage](http://gocover.io/_badge/github.com/buraksezer/consistent)](http://gocover.io/github.com/buraksezer/consistent) [![Go Report Card](https://goreportcard.com/badge/github.com/buraksezer/consistent)](https://goreportcard.com/report/github.com/buraksezer/consistent) [![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT) [![Mentioned in Awesome Go](https://awesome.re/mentioned-badge.svg)](https://github.com/avelino/awesome-go)  
+This library provides a consistent hashing function which simultaneously achieves both uniformity and consistency. 
+For detailed information about the concept, you should take a look at the following resources:
+* [Consistent Hashing with Bounded Loads on Google Research Blog](https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html)
+* [Improving load balancing with a new consistent-hashing algorithm on Vimeo Engineering Blog](https://medium.com/vimeo-engineering-blog/improving-load-balancing-with-a-new-consistent-hashing-algorithm-9f1bd75709ed)
+* [Consistent Hashing with Bounded Loads paper on arXiv](https://arxiv.org/abs/1608.01350)
+Table of Content
+- [Overview](#overview)
+- [Install](#install)
+- [Configuration](#configuration)
+- [Usage](#usage)
+- [Benchmarks](#benchmarks)
+- [Examples](#examples)
+In this package's context, the keys are distributed among partitions and partitions are distributed among members as well. 
+When you create a new consistent instance or call `Add/Remove`:
+* The member's name is hashed and inserted into the hash ring,
+* Average load is calculated by the algorithm defined in the paper,
+* Partitions are distributed among members by hashing partition IDs and none of them exceed the average load.
+Average load cannot be exceeded. So if all members are loaded at the maximum while trying to add a new member, it panics.
+When you want to locate a key by calling `LocateKey`:
+* The key(byte slice) is hashed,
+* The result of the hash is mod by the number of partitions,
+* The result of this modulo - `MOD(hash result, partition count)` - is the partition in which the key will be located,
+* Owner of the partition is already determined before calling `LocateKey`. So it returns the partition owner immediately.
+No memory is allocated by `consistent` except hashing when you want to locate a key.
+Note that the number of partitions cannot be changed after creation. 
+With a correctly configured Go environment:
+go get github.com/buraksezer/consistent
+You will find some useful usage samples in [examples](https://github.com/buraksezer/consistent/tree/master/_examples) folder.
+type Config struct {
+	// Hasher is responsible for generating unsigned, 64 bit hash of provided byte slice.
+	Hasher Hasher
+	// Keys are distributed among partitions. Prime numbers are good to
+	// distribute keys uniformly. Select a big PartitionCount if you have
+	// too many keys.
+	PartitionCount int
+	// Members are replicated on consistent hash ring. This number controls
+	// the number each member is replicated on the ring.
+	ReplicationFactor int
+	// Load is used to calculate average load. See the code, the paper and Google's 
+	// blog post to learn about it.
+	Load float64
+Any hash algorithm can be used as hasher which implements Hasher interface. Please take a look at the *Sample* section for an example.
+`LocateKey` function finds a member in the cluster for your key:
+// With a properly configured and initialized consistent instance
+key := []byte("my-key")
+member := c.LocateKey(key)
+It returns a thread-safe copy of the member you added before.
+The second most frequently used function is `GetClosestN`. 
+// With a properly configured and initialized consistent instance
+key := []byte("my-key")
+members, err := c.GetClosestN(key, 2)
+This may be useful to find backup nodes to store your key.
+On an early 2015 Macbook:
+BenchmarkAddRemove-4     	  100000	     22006 ns/op
+BenchmarkLocateKey-4     	 5000000	       252 ns/op
+BenchmarkGetClosestN-4   	  500000	      2974 ns/op
+The most basic use of consistent package should be like this. For detailed list of functions, [visit godoc.org.](https://godoc.org/github.com/buraksezer/consistent)
+More sample code can be found under [_examples](https://github.com/buraksezer/consistent/tree/master/_examples).
+package main
+import (
+	"fmt"
+	"github.com/buraksezer/consistent"
+	"github.com/cespare/xxhash"
+// In your code, you probably have a custom data type 
+// for your cluster members. Just add a String function to implement 
+// consistent.Member interface.
+type myMember string
+func (m myMember) String() string {
+	return string(m)
+// consistent package doesn't provide a default hashing function. 
+// You should provide a proper one to distribute keys/members uniformly.
+type hasher struct{}
+func (h hasher) Sum64(data []byte) uint64 {
+	// you should use a proper hash function for uniformity.
+	return xxhash.Sum64(data)
+func main() {
+	// Create a new consistent instance
+	cfg := consistent.Config{
+		PartitionCount:    7,
+		ReplicationFactor: 20,
+		Load:              1.25,
+		Hasher:            hasher{},
+	}
+	c := consistent.New(nil, cfg)
+	// Add some members to the consistent hash table.
+	// Add function calculates average load and distributes partitions over members
+	node1 := myMember("node1.olric.com")
+	c.Add(node1)
+	node2 := myMember("node2.olric.com")
+	c.Add(node2)
+	key := []byte("my-key")
+	// calculates partition id for the given key
+	// partID := hash(key) % partitionCount
+	// the partitions are already distributed among members by Add function.
+	owner := c.LocateKey(key)
+	fmt.Println(owner.String())
+	// Prints node2.olric.com
+Another useful example is `_examples/relocation_percentage.go`. It creates a `consistent` object with 8 members and distributes partitions among them. Then adds 9th member, 
+here is the result with a proper configuration and hash function:
+bloom:consistent burak$ go run _examples/relocation_percentage.go
+partID: 218 moved to node2.olric from node0.olric
+partID: 173 moved to node9.olric from node3.olric
+partID: 225 moved to node7.olric from node0.olric
+partID:  85 moved to node9.olric from node7.olric
+partID: 220 moved to node5.olric from node0.olric
+partID:  33 moved to node9.olric from node5.olric
+partID: 254 moved to node9.olric from node4.olric
+partID:  71 moved to node9.olric from node3.olric
+partID: 236 moved to node9.olric from node2.olric
+partID: 118 moved to node9.olric from node3.olric
+partID: 233 moved to node3.olric from node0.olric
+partID:  50 moved to node9.olric from node4.olric
+partID: 252 moved to node9.olric from node2.olric
+partID: 121 moved to node9.olric from node2.olric
+partID: 259 moved to node9.olric from node4.olric
+partID:  92 moved to node9.olric from node7.olric
+partID: 152 moved to node9.olric from node3.olric
+partID: 105 moved to node9.olric from node2.olric
+6% of the partitions are relocated
+Moved partition count is highly dependent on your configuration and quailty of hash function. You should modify the configuration to find an optimum set of configurations
+for your system.
+`_examples/load_distribution.go` is also useful to understand load distribution. It creates a `consistent` object with 8 members and locates 1M key. It also calculates average 
+load which cannot be exceeded by any member. Here is the result:
+Maximum key count for a member should be around this:  147602
+member: node2.olric, key count: 100362
+member: node5.olric, key count: 99448
+member: node0.olric, key count: 147735
+member: node3.olric, key count: 103455
+member: node6.olric, key count: 147069
+member: node1.olric, key count: 121566
+member: node4.olric, key count: 147932
+member: node7.olric, key count: 132433
+Average load can be calculated by using the following formula:
+load := (consistent.AverageLoad() * float64(keyCount)) / float64(config.PartitionCount)
+Please don't hesitate to fork the project and send a pull request or just e-mail me to ask questions and share ideas.
+MIT License, - see LICENSE for more details.
+// Copyright (c) 2018 Burak Sezer
+// All rights reserved.
+// This code is licensed under the MIT License.
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files(the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions :
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+// Package consistent provides a consistent hashing function with bounded loads.
+// For more information about the underlying algorithm, please take a look at
+// https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html
+// Example Use:
+// 	cfg := consistent.Config{
+// 		PartitionCount:    71,
+// 		ReplicationFactor: 20,
+// 		Load:              1.25,
+// 		Hasher:            hasher{},
+//	}
+//      // Create a new consistent object
+//      // You may call this with a list of members
+//      // instead of adding them one by one.
+//	c := consistent.New(members, cfg)
+//      // myMember struct just needs to implement a String method.
+//      // New/Add/Remove distributes partitions among members using the algorithm
+//      // defined on Google Research Blog.
+//	c.Add(myMember)
+//	key := []byte("my-key")
+//      // LocateKey hashes the key and calculates partition ID with
+//      // this modulo operation: MOD(hash result, partition count)
+//      // The owner of the partition is already calculated by New/Add/Remove.
+//      // LocateKey just returns the member which's responsible for the key.
+//	member := c.LocateKey(key)
+package consistent
+import (
+	"encoding/binary"
+	"errors"
+	"fmt"
+	"math"
+	"sort"
+	"sync"
+var (
+	//ErrInsufficientMemberCount represents an error which means there are not enough members to complete the task.
+	ErrInsufficientMemberCount = errors.New("insufficient member count")
+	// ErrMemberNotFound represents an error which means requested member could not be found in consistent hash ring.
+	ErrMemberNotFound = errors.New("member could not be found in ring")
+// Hasher is responsible for generating unsigned, 64 bit hash of provided byte slice.
+// Hasher should minimize collisions (generating same hash for different byte slice)
+// and while performance is also important fast functions are preferable (i.e.
+// you can use FarmHash family).
+type Hasher interface {
+	Sum64([]byte) uint64
+// Member interface represents a member in consistent hash ring.
+type Member interface {
+	String() string
+// Config represents a structure to control consistent package.
+type Config struct {
+	// Hasher is responsible for generating unsigned, 64 bit hash of provided byte slice.
+	Hasher Hasher
+	// Keys are distributed among partitions. Prime numbers are good to
+	// distribute keys uniformly. Select a big PartitionCount if you have
+	// too many keys.
+	PartitionCount int
+	// Members are replicated on consistent hash ring. This number means that a member
+	// how many times replicated on the ring.
+	ReplicationFactor int
+	// Load is used to calculate average load. See the code, the paper and Google's blog post to learn about it.
+	Load float64
+// Consistent holds the information about the members of the consistent hash circle.
+type Consistent struct {
+	mu sync.RWMutex
+	config         Config
+	hasher         Hasher
+	sortedSet      []uint64
+	partitionCount uint64
+	loads          map[string]float64
+	members        map[string]*Member
+	partitions     map[int]*Member
+	ring           map[uint64]*Member
+// New creates and returns a new Consistent object.
+func New(members []Member, config Config) *Consistent {
+	c := &Consistent{
+		config:         config,
+		members:        make(map[string]*Member),
+		partitionCount: uint64(config.PartitionCount),
+		ring:           make(map[uint64]*Member),
+	}
+	if config.Hasher == nil {
+		panic("Hasher cannot be nil")
+	}
+	// TODO: Check configuration here
+	c.hasher = config.Hasher
+	for _, member := range members {
+		c.add(member)
+	}
+	if members != nil {
+		c.distributePartitions()
+	}
+	return c
+// GetMembers returns a thread-safe copy of members.
+func (c *Consistent) GetMembers() []Member {
+	c.mu.RLock()
+	defer c.mu.RUnlock()
+	// Create a thread-safe copy of member list.
+	members := make([]Member, 0, len(c.members))
+	for _, member := range c.members {
+		members = append(members, *member)
+	}
+	return members
+// AverageLoad exposes the current average load.
+func (c *Consistent) AverageLoad() float64 {
+	avgLoad := float64(c.partitionCount/uint64(len(c.members))) * c.config.Load
+	return math.Ceil(avgLoad)
+func (c *Consistent) distributeWithLoad(partID, idx int, partitions map[int]*Member, loads map[string]float64) {
+	avgLoad := c.AverageLoad()
+	var count int
+	for {
+		count++
+		if count >= len(c.sortedSet) {
+			// User needs to decrease partition count, increase member count or increase load factor.
+			panic("not enough room to distribute partitions")
+		}
+		i := c.sortedSet[idx]
+		member := *c.ring[i]
+		load := loads[member.String()]
+		if load+1 <= avgLoad {
+			partitions[partID] = &member
+			loads[member.String()]++
+			return
+		}
+		idx++
+		if idx >= len(c.sortedSet) {
+			idx = 0
+		}
+	}
+func (c *Consistent) distributePartitions() {
+	loads := make(map[string]float64)
+	partitions := make(map[int]*Member)
+	bs := make([]byte, 8)
+	for partID := uint64(0); partID < c.partitionCount; partID++ {
+		binary.LittleEndian.PutUint64(bs, partID)
+		key := c.hasher.Sum64(bs)
+		idx := sort.Search(len(c.sortedSet), func(i int) bool {
+			return c.sortedSet[i] >= key
+		})
+		if idx >= len(c.sortedSet) {
+			idx = 0
+		}
+		c.distributeWithLoad(int(partID), idx, partitions, loads)
+	}
+	c.partitions = partitions
+	c.loads = loads
+func (c *Consistent) add(member Member) {
+	for i := 0; i < c.config.ReplicationFactor; i++ {
+		key := []byte(fmt.Sprintf("%s%d", member.String(), i))
+		h := c.hasher.Sum64(key)
+		c.ring[h] = &member
+		c.sortedSet = append(c.sortedSet, h)
+	}
+	// sort hashes ascendingly
+	sort.Slice(c.sortedSet, func(i int, j int) bool {
+		return c.sortedSet[i] < c.sortedSet[j]
+	})
+	// Storing member at this map is useful to find backup members of a partition.
+	c.members[member.String()] = &member
+// Add adds a new member to the consistent hash circle.
+func (c *Consistent) Add(member Member) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if _, ok := c.members[member.String()]; ok {
+		// We already have this member. Quit immediately.
+		return
+	}
+	c.add(member)
+	c.distributePartitions()
+func (c *Consistent) delSlice(val uint64) {
+	for i := 0; i < len(c.sortedSet); i++ {
+		if c.sortedSet[i] == val {
+			c.sortedSet = append(c.sortedSet[:i], c.sortedSet[i+1:]...)
+			break
+		}
+	}
+// Remove removes a member from the consistent hash circle.
+func (c *Consistent) Remove(name string) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if _, ok := c.members[name]; !ok {
+		// There is no member with that name. Quit immediately.
+		return
+	}
+	for i := 0; i < c.config.ReplicationFactor; i++ {
+		key := []byte(fmt.Sprintf("%s%d", name, i))
+		h := c.hasher.Sum64(key)
+		delete(c.ring, h)
+		c.delSlice(h)
+	}
+	delete(c.members, name)
+	if len(c.members) == 0 {
+		// consistent hash ring is empty now. Reset the partition table.
+		c.partitions = make(map[int]*Member)
+		return
+	}
+	c.distributePartitions()
+// LoadDistribution exposes load distribution of members.
+func (c *Consistent) LoadDistribution() map[string]float64 {
+	c.mu.RLock()
+	defer c.mu.RUnlock()
+	// Create a thread-safe copy
+	res := make(map[string]float64)
+	for member, load := range c.loads {
+		res[member] = load
+	}
+	return res
+// FindPartitionID returns partition id for given key.
+func (c *Consistent) FindPartitionID(key []byte) int {
+	hkey := c.hasher.Sum64(key)
+	return int(hkey % c.partitionCount)
+// GetPartitionOwner returns the owner of the given partition.
+func (c *Consistent) GetPartitionOwner(partID int) Member {
+	c.mu.RLock()
+	defer c.mu.RUnlock()
+	member, ok := c.partitions[partID]
+	if !ok {
+		return nil
+	}
+	// Create a thread-safe copy of member and return it.
+	return *member
+// LocateKey finds a home for given key
+func (c *Consistent) LocateKey(key []byte) Member {
+	partID := c.FindPartitionID(key)
+	return c.GetPartitionOwner(partID)
+func (c *Consistent) getClosestN(partID, count int) ([]Member, error) {
+	c.mu.RLock()
+	defer c.mu.RUnlock()
+	res := []Member{}
+	if count > len(c.members) {
+		return res, ErrInsufficientMemberCount
+	}
+	var ownerKey uint64
+	owner := c.GetPartitionOwner(partID)
+	// Hash and sort all the names.
+	keys := []uint64{}
+	kmems := make(map[uint64]*Member)
+	for name, member := range c.members {
+		key := c.hasher.Sum64([]byte(name))
+		if name == owner.String() {
+			ownerKey = key
+		}
+		keys = append(keys, key)
+		kmems[key] = member
+	}
+	sort.Slice(keys, func(i, j int) bool {
+		return keys[i] < keys[j]
+	})
+	// Find the key owner
+	idx := 0
+	for idx < len(keys) {
+		if keys[idx] == ownerKey {
+			key := keys[idx]
+			res = append(res, *kmems[key])
+			break
+		}
+		idx++
+	}
+	// Find the closest(replica owners) members.
+	for len(res) < count {
+		idx++
+		if idx >= len(keys) {
+			idx = 0
+		}
+		key := keys[idx]
+		res = append(res, *kmems[key])
+	}
+	return res, nil
+// GetClosestN returns the closest N member to a key in the hash ring.
+// This may be useful to find members for replication.
+func (c *Consistent) GetClosestN(key []byte, count int) ([]Member, error) {
+	partID := c.FindPartitionID(key)
+	return c.getClosestN(partID, count)
+// GetClosestNForPartition returns the closest N member for given partition.
+// This may be useful to find members for replication.
+func (c *Consistent) GetClosestNForPartition(partID, count int) ([]Member, error) {
+	return c.getClosestN(partID, count)
+Copyright (c) 2016 Caleb Spare
+MIT License
+Permission is hereby granted, free of charge, to any person obtaining
+a copy of this software and associated documentation files (the
+"Software"), to deal in the Software without restriction, including
+without limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+# xxhash
+xxhash is a Go implementation of the 64-bit
+[xxHash](http://cyan4973.github.io/xxHash/) algorithm, XXH64. This is a
+high-quality hashing algorithm that is much faster than anything in the Go
+standard library.
+The API is very small, taking its cue from the other hashing packages in the
+standard library:
+    $ go doc github.com/cespare/xxhash                                                                                                                                                                                              !
+    package xxhash // import "github.com/cespare/xxhash"
+    Package xxhash implements the 64-bit variant of xxHash (XXH64) as described
+    at http://cyan4973.github.io/xxHash/.
+    func New() hash.Hash64
+    func Sum64(b []byte) uint64
+    func Sum64String(s string) uint64
+This implementation provides a fast pure-Go implementation and an even faster
+assembly implementation for amd64.
+## Benchmarks
+Here are some quick benchmarks comparing the pure-Go and assembly
+implementations of Sum64 against another popular Go XXH64 implementation,
+| input size | OneOfOne | cespare (purego) | cespare |
+| --- | --- | --- | --- |
+| 5 B   |  416 MB/s | 720 MB/s |  872 MB/s  |
+| 100 B | 3980 MB/s | 5013 MB/s | 5252 MB/s  |
+| 4 KB  | 12727 MB/s | 12999 MB/s | 13026 MB/s |
+| 10 MB | 9879 MB/s | 10775 MB/s | 10913 MB/s  |
+These numbers were generated with:
+$ go test -benchtime 10s -bench '/OneOfOne,'
+$ go test -tags purego -benchtime 10s -bench '/xxhash,'
+$ go test -benchtime 10s -bench '/xxhash,'
+## Projects using this package
+- [InfluxDB](https://github.com/influxdata/influxdb)
+- [Prometheus](https://github.com/prometheus/prometheus)
+module github.com/cespare/xxhash
+require (
+	github.com/OneOfOne/xxhash v1.2.2
+	github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72
+github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
+github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
+github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
+github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
+// +build !go1.9
+package xxhash
+// TODO(caleb): After Go 1.10 comes out, remove this fallback code.
+func rol1(x uint64) uint64  { return (x << 1) | (x >> (64 - 1)) }
+func rol7(x uint64) uint64  { return (x << 7) | (x >> (64 - 7)) }
+func rol11(x uint64) uint64 { return (x << 11) | (x >> (64 - 11)) }
+func rol12(x uint64) uint64 { return (x << 12) | (x >> (64 - 12)) }
+func rol18(x uint64) uint64 { return (x << 18) | (x >> (64 - 18)) }
+func rol23(x uint64) uint64 { return (x << 23) | (x >> (64 - 23)) }
+func rol27(x uint64) uint64 { return (x << 27) | (x >> (64 - 27)) }
+func rol31(x uint64) uint64 { return (x << 31) | (x >> (64 - 31)) }
+// +build go1.9
+package xxhash
+import "math/bits"
+func rol1(x uint64) uint64  { return bits.RotateLeft64(x, 1) }
+func rol7(x uint64) uint64  { return bits.RotateLeft64(x, 7) }
+func rol11(x uint64) uint64 { return bits.RotateLeft64(x, 11) }
+func rol12(x uint64) uint64 { return bits.RotateLeft64(x, 12) }
+func rol18(x uint64) uint64 { return bits.RotateLeft64(x, 18) }
+func rol23(x uint64) uint64 { return bits.RotateLeft64(x, 23) }
+func rol27(x uint64) uint64 { return bits.RotateLeft64(x, 27) }
+func rol31(x uint64) uint64 { return bits.RotateLeft64(x, 31) }
+// Package xxhash implements the 64-bit variant of xxHash (XXH64) as described
+// at http://cyan4973.github.io/xxHash/.
+package xxhash
+import (
+	"encoding/binary"
+	"hash"
+const (
+	prime1 uint64 = 11400714785074694791
+	prime2 uint64 = 14029467366897019727
+	prime3 uint64 = 1609587929392839161
+	prime4 uint64 = 9650029242287828579
+	prime5 uint64 = 2870177450012600261
+// NOTE(caleb): I'm using both consts and vars of the primes. Using consts where
+// possible in the Go code is worth a small (but measurable) performance boost
+// by avoiding some MOVQs. Vars are needed for the asm and also are useful for
+// convenience in the Go code in a few places where we need to intentionally
+// avoid constant arithmetic (e.g., v1 := prime1 + prime2 fails because the
+// result overflows a uint64).
+var (
+	prime1v = prime1
+	prime2v = prime2
+	prime3v = prime3
+	prime4v = prime4
+	prime5v = prime5
+type xxh struct {
+	v1    uint64
+	v2    uint64
+	v3    uint64
+	v4    uint64
+	total int
+	mem   [32]byte
+	n     int // how much of mem is used
+// New creates a new hash.Hash64 that implements the 64-bit xxHash algorithm.
+func New() hash.Hash64 {
+	var x xxh
+	x.Reset()
+	return &x
+func (x *xxh) Reset() {
+	x.n = 0
+	x.total = 0
+	x.v1 = prime1v + prime2
+	x.v2 = prime2
+	x.v3 = 0
+	x.v4 = -prime1v
+func (x *xxh) Size() int      { return 8 }
+func (x *xxh) BlockSize() int { return 32 }
+// Write adds more data to x. It always returns len(b), nil.
+func (x *xxh) Write(b []byte) (n int, err error) {
+	n = len(b)
+	x.total += len(b)
+	if x.n+len(b) < 32 {
+		// This new data doesn't even fill the current block.
+		copy(x.mem[x.n:], b)
+		x.n += len(b)
+		return
+	}
+	if x.n > 0 {
+		// Finish off the partial block.
+		copy(x.mem[x.n:], b)
+		x.v1 = round(x.v1, u64(x.mem[0:8]))
+		x.v2 = round(x.v2, u64(x.mem[8:16]))
+		x.v3 = round(x.v3, u64(x.mem[16:24]))
+		x.v4 = round(x.v4, u64(x.mem[24:32]))
+		b = b[32-x.n:]
+		x.n = 0
+	}
+	if len(b) >= 32 {
+		// One or more full blocks left.
+		b = writeBlocks(x, b)
+	}
+	// Store any remaining partial block.
+	copy(x.mem[:], b)
+	x.n = len(b)
+	return
+func (x *xxh) Sum(b []byte) []byte {
+	s := x.Sum64()
+	return append(
+		b,
+		byte(s>>56),
+		byte(s>>48),
+		byte(s>>40),
+		byte(s>>32),
+		byte(s>>24),
+		byte(s>>16),
+		byte(s>>8),
+		byte(s),
+	)
+func (x *xxh) Sum64() uint64 {
+	var h uint64
+	if x.total >= 32 {
+		v1, v2, v3, v4 := x.v1, x.v2, x.v3, x.v4
+		h = rol1(v1) + rol7(v2) + rol12(v3) + rol18(v4)
+		h = mergeRound(h, v1)
+		h = mergeRound(h, v2)
+		h = mergeRound(h, v3)
+		h = mergeRound(h, v4)
+	} else {
+		h = x.v3 + prime5
+	}
+	h += uint64(x.total)
+	i, end := 0, x.n
+	for ; i+8 <= end; i += 8 {
+		k1 := round(0, u64(x.mem[i:i+8]))
+		h ^= k1
+		h = rol27(h)*prime1 + prime4
+	}
+	if i+4 <= end {
+		h ^= uint64(u32(x.mem[i:i+4])) * prime1
+		h = rol23(h)*prime2 + prime3
+		i += 4
+	}
+	for i < end {
+		h ^= uint64(x.mem[i]) * prime5
+		h = rol11(h) * prime1
+		i++
+	}
+	h ^= h >> 33
+	h *= prime2
+	h ^= h >> 29
+	h *= prime3
+	h ^= h >> 32
+	return h
+func u64(b []byte) uint64 { return binary.LittleEndian.Uint64(b) }
+func u32(b []byte) uint32 { return binary.LittleEndian.Uint32(b) }
+func round(acc, input uint64) uint64 {
+	acc += input * prime2
+	acc = rol31(acc)
+	acc *= prime1
+	return acc
+func mergeRound(acc, val uint64) uint64 {
+	val = round(0, val)
+	acc ^= val
+	acc = acc*prime1 + prime4
+	return acc
+// +build !appengine
+// +build gc
+// +build !purego
+package xxhash
+// Sum64 computes the 64-bit xxHash digest of b.
+func Sum64(b []byte) uint64
+func writeBlocks(x *xxh, b []byte) []byte
+// +build !appengine
+// +build gc
+// +build !purego
+#include "textflag.h"
+// Register allocation:
+// AX	h
+// CX	pointer to advance through b
+// DX	n
+// BX	loop end
+// R8	v1, k1
+// R9	v2
+// R10	v3
+// R11	v4
+// R12	tmp
+// R13	prime1v
+// R14	prime2v
+// R15	prime4v
+// round reads from and advances the buffer pointer in CX.
+// It assumes that R13 has prime1v and R14 has prime2v.
+#define round(r) \
+	MOVQ  (CX), R12 \
+	ADDQ  $8, CX    \
+	IMULQ R14, R12  \
+	ADDQ  R12, r    \
+	ROLQ  $31, r    \
+	IMULQ R13, r
+// mergeRound applies a merge round on the two registers acc and val.
+// It assumes that R13 has prime1v, R14 has prime2v, and R15 has prime4v.
+#define mergeRound(acc, val) \
+	IMULQ R14, val \
+	ROLQ  $31, val \
+	IMULQ R13, val \
+	XORQ  val, acc \
+	IMULQ R13, acc \
+	ADDQ  R15, acc
+// func Sum64(b []byte) uint64
+TEXT ·Sum64(SB), NOSPLIT, $0-32
+	// Load fixed primes.
+	MOVQ ·prime1v(SB), R13
+	MOVQ ·prime2v(SB), R14
+	MOVQ ·prime4v(SB), R15
+	// Load slice.
+	MOVQ b_base+0(FP), CX
+	MOVQ b_len+8(FP), DX
+	LEAQ (CX)(DX*1), BX
+	// The first loop limit will be len(b)-32.
+	SUBQ $32, BX
+	// Check whether we have at least one block.
+	CMPQ DX, $32
+	JLT  noBlocks
+	// Set up initial state (v1, v2, v3, v4).
+	MOVQ R13, R8
+	ADDQ R14, R8
+	MOVQ R14, R9
+	XORQ R10, R10
+	XORQ R11, R11
+	SUBQ R13, R11
+	// Loop until CX > BX.
+	round(R8)
+	round(R9)
+	round(R10)
+	round(R11)
+	JLE  blockLoop
+	ROLQ $1, AX
+	MOVQ R9, R12
+	ROLQ $7, R12
+	ADDQ R12, AX
+	MOVQ R10, R12
+	ROLQ $12, R12
+	ADDQ R12, AX
+	MOVQ R11, R12
+	ROLQ $18, R12
+	ADDQ R12, AX
+	mergeRound(AX, R8)
+	mergeRound(AX, R9)
+	mergeRound(AX, R10)
+	mergeRound(AX, R11)
+	JMP afterBlocks
+	MOVQ ·prime5v(SB), AX
+	// Right now BX has len(b)-32, and we want to loop until CX > len(b)-8.
+	ADDQ $24, BX
+	JG   fourByte
+	// Calculate k1.
+	MOVQ  (CX), R8
+	ADDQ  $8, CX
+	IMULQ R14, R8
+	ROLQ  $31, R8
+	IMULQ R13, R8
+	XORQ  R8, AX
+	ROLQ  $27, AX
+	ADDQ  R15, AX
+	JLE  wordLoop
+	ADDQ $4, BX
+	JG   singles
+	MOVL  (CX), R8
+	ADDQ  $4, CX
+	IMULQ R13, R8
+	XORQ  R8, AX
+	ROLQ  $23, AX
+	ADDQ  ·prime3v(SB), AX
+	ADDQ $4, BX
+	JGE  finalize
+	ADDQ    $1, CX
+	IMULQ   ·prime5v(SB), R12
+	XORQ    R12, AX
+	ROLQ  $11, AX
+	JL   singlesLoop
+	MOVQ  AX, R12
+	SHRQ  $33, R12
+	XORQ  R12, AX
+	MOVQ  AX, R12
+	SHRQ  $29, R12
+	XORQ  R12, AX
+	IMULQ ·prime3v(SB), AX
+	MOVQ  AX, R12
+	SHRQ  $32, R12
+	XORQ  R12, AX
+	MOVQ AX, ret+24(FP)
+// writeBlocks uses the same registers as above except that it uses AX to store
+// the x pointer.
+// func writeBlocks(x *xxh, b []byte) []byte
+TEXT ·writeBlocks(SB), NOSPLIT, $0-56
+	// Load fixed primes needed for round.
+	MOVQ ·prime1v(SB), R13
+	MOVQ ·prime2v(SB), R14
+	// Load slice.
+	MOVQ b_base+8(FP), CX
+	MOVQ CX, ret_base+32(FP) // initialize return base pointer; see NOTE below
+	MOVQ b_len+16(FP), DX
+	LEAQ (CX)(DX*1), BX
+	SUBQ $32, BX
+	// Load vN from x.
+	MOVQ x+0(FP), AX
+	MOVQ 0(AX), R8   // v1
+	MOVQ 8(AX), R9   // v2
+	MOVQ 16(AX), R10 // v3
+	MOVQ 24(AX), R11 // v4
+	// We don't need to check the loop condition here; this function is
+	// always called with at least one block of data to process.
+	round(R8)
+	round(R9)
+	round(R10)
+	round(R11)
+	JLE  blockLoop
+	// Copy vN back to x.
+	MOVQ R8, 0(AX)
+	MOVQ R9, 8(AX)
+	MOVQ R10, 16(AX)
+	MOVQ R11, 24(AX)
+	// Construct return slice.
+	// NOTE: It's important that we don't construct a slice that has a base
+	// pointer off the end of the original slice, as in Go 1.7+ this will
+	// cause runtime crashes. (See discussion in, for example,
+	// https://github.com/golang/go/issues/16772.)
+	// Therefore, we calculate the length/cap first, and if they're zero, we
+	// keep the old base. This is what the compiler does as well if you
+	// write code like
+	//   b = b[len(b):]
+	// New length is 32 - (CX - BX) -> BX+32 - CX.
+	ADDQ $32, BX
+	JZ   afterSetBase
+	MOVQ CX, ret_base+32(FP)
+	MOVQ BX, ret_len+40(FP)
+	MOVQ BX, ret_cap+48(FP) // set cap == len
new file mode 100644
index 0000000..c68d13f
--- /dev/null
+++ b/vendor/github.com/cespare/xxhash/xxhash_other.go
@@ -0,0 +1,75 @@
+// +build !amd64 appengine !gc purego
+package xxhash
+// Sum64 computes the 64-bit xxHash digest of b.
+func Sum64(b []byte) uint64 {
+	// A simpler version would be
+	//   x := New()
+	//   x.Write(b)
+	//   return x.Sum64()
+	// but this is faster, particularly for small inputs.
+	n := len(b)
+	var h uint64
+	if n >= 32 {
+		v1 := prime1v + prime2
+		v2 := prime2
+		v3 := uint64(0)
+		v4 := -prime1v
+		for len(b) >= 32 {
+			v1 = round(v1, u64(b[0:8:len(b)]))
+			v2 = round(v2, u64(b[8:16:len(b)]))
+			v3 = round(v3, u64(b[16:24:len(b)]))
+			v4 = round(v4, u64(b[24:32:len(b)]))
+			b = b[32:len(b):len(b)]
+		}
+		h = rol1(v1) + rol7(v2) + rol12(v3) + rol18(v4)
+		h = mergeRound(h, v1)
+		h = mergeRound(h, v2)
+		h = mergeRound(h, v3)
+		h = mergeRound(h, v4)
+	} else {
+		h = prime5
+	}
+	h += uint64(n)
+	i, end := 0, len(b)
+	for ; i+8 <= end; i += 8 {
+		k1 := round(0, u64(b[i:i+8:len(b)]))
+		h ^= k1
+		h = rol27(h)*prime1 + prime4
+	}
+	if i+4 <= end {
+		h ^= uint64(u32(b[i:i+4:len(b)])) * prime1
+		h = rol23(h)*prime2 + prime3
+		i += 4
+	}
+	for ; i < end; i++ {
+		h ^= uint64(b[i]) * prime5
+		h = rol11(h) * prime1
+	}
+	h ^= h >> 33
+	h *= prime2
+	h ^= h >> 29
+	h *= prime3
+	h ^= h >> 32
+	return h
+func writeBlocks(x *xxh, b []byte) []byte {
+	v1, v2, v3, v4 := x.v1, x.v2, x.v3, x.v4
+	for len(b) >= 32 {
+		v1 = round(v1, u64(b[0:8:len(b)]))
+		v2 = round(v2, u64(b[8:16:len(b)]))
+		v3 = round(v3, u64(b[16:24:len(b)]))
+		v4 = round(v4, u64(b[24:32:len(b)]))
+		b = b[32:len(b):len(b)]
+	}
+	x.v1, x.v2, x.v3, x.v4 = v1, v2, v3, v4
+	return b
+// +build appengine
+// This file contains the safe implementations of otherwise unsafe-using code.
+package xxhash
+// Sum64String computes the 64-bit xxHash digest of s.
+func Sum64String(s string) uint64 {
+	return Sum64([]byte(s))
diff --git a/vendor/github.com/cespare/xxhash/xxhash_unsafe.go b/vendor/github.com/cespare/xxhash/xxhash_unsafe.go
+// +build !appengine
+// This file encapsulates usage of unsafe.
+// xxhash_safe.go contains the safe implementations.
+package xxhash
+import (
+	"reflect"
+	"unsafe"
+// Sum64String computes the 64-bit xxHash digest of s.
+// It may be faster than Sum64([]byte(s)) by avoiding a copy.
+// TODO(caleb): Consider removing this if an optimization is ever added to make
+// it unnecessary: https://golang.org/issue/2205.
+// TODO(caleb): We still have a function call; we could instead write Go/asm
+// copies of Sum64 for strings to squeeze out a bit more speed.
+func Sum64String(s string) uint64 {
+	// See https://groups.google.com/d/msg/golang-nuts/dcjzJy-bSpw/tcZYBzQqAQAJ
+	// for some discussion about this unsafe conversion.
+	var b []byte
+	bh := (*reflect.SliceHeader)(unsafe.Pointer(&b))
+	bh.Data = (*reflect.StringHeader)(unsafe.Pointer(&s)).Data
+	bh.Len = len(s)
+	bh.Cap = len(s)
+	return Sum64(b)
 import (
+	"github.com/opencord/voltha-lib-go/v3/pkg/db"
@@ -32,14 +33,17 @@
 	kafkaICProxy kafka.InterContainerProxy
 	adapterTopic string
 	coreTopic    string
+	endpointMgr  kafka.EndpointManager
-func NewAdapterProxy(kafkaProxy kafka.InterContainerProxy, adapterTopic string, coreTopic string) *AdapterProxy {
-	var proxy AdapterProxy
-	proxy.kafkaICProxy = kafkaProxy
-	proxy.adapterTopic = adapterTopic
-	proxy.coreTopic = coreTopic
-	logger.Debugw("TOPICS", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
+func NewAdapterProxy(kafkaProxy kafka.InterContainerProxy, adapterTopic string, coreTopic string, backend *db.Backend) *AdapterProxy {
+	proxy := AdapterProxy{
+		kafkaICProxy: kafkaProxy,
+		adapterTopic: adapterTopic,
+		coreTopic:    coreTopic,
+		endpointMgr:  kafka.NewEndpointManager(backend),
+	}
+	logger.Debugw("topics", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
 	return &proxy
@@ -87,7 +91,11 @@
 	// Set up the required rpc arguments
-	topic := kafka.Topic{Name: toAdapter}
+	endpoint, err := ap.endpointMgr.GetEndpoint(toDeviceId, toAdapter)
+	if err != nil {
+		return err
+	}
+	topic := kafka.Topic{Name: string(endpoint)}
 	replyToTopic := kafka.Topic{Name: fromAdapter}
 	rpc := "process_inter_adapter_message"
 	topic := kafka.Topic{Name: ap.coreTopic}
 	replyToTopic := ap.getAdapterTopic()
 	args := make([]*kafka.KVArg, 2)
+	if adapter.TotalReplicas == 0 && adapter.CurrentReplica != 0 {
+		log.Fatal("totalReplicas can't be 0, since you're here you have at least one")
+	}
+	if adapter.CurrentReplica == 0 && adapter.TotalReplicas != 0 {
+		log.Fatal("currentReplica can't be 0, it has to start from 1")
+	}
+	if adapter.CurrentReplica == 0 && adapter.TotalReplicas == 0 {
+		// if the adapter is not setting these fields they default to 0,
+		// in that case it means the adapter is not ready to be scaled and thus it defaults
+		// to a single instance
+		adapter.CurrentReplica = 1
+		adapter.TotalReplicas = 1
+	}
+	if adapter.CurrentReplica > adapter.TotalReplicas {
+		log.Fatalf("CurrentReplica (%d) can't be greater than TotalReplicas (%d)",
+			adapter.CurrentReplica, adapter.TotalReplicas)
+	}
 	args[0] = &kafka.KVArg{
 		Key:   "adapter",
 		Value: adapter,
 	ConfigAttribute string
-// ConfigManager is a wrapper over backend to maintain Configuration of voltha components
+// ConfigManager is a wrapper over Backend to maintain Configuration of voltha components
 // in kvstore based persistent storage
 type ConfigManager struct {
-	backend             *db.Backend
+	Backend             *db.Backend
 	KvStoreConfigPrefix string
@@ -95,7 +95,7 @@
 	return &ConfigManager{
 		KvStoreConfigPrefix: defaultkvStoreConfigPath,
-		backend: &db.Backend{
+		Backend: &db.Backend{
 			Client:     kvClient,
 			StoreType:  kvStoreType,
 			Host:       kvStoreHost,
@@ -108,12 +108,12 @@
 // RetrieveComponentList list the component Names for which loglevel is stored in kvstore
 func (c *ConfigManager) RetrieveComponentList(ctx context.Context, configType ConfigType) ([]string, error) {
-	data, err := c.backend.List(ctx, c.KvStoreConfigPrefix)
+	data, err := c.Backend.List(ctx, c.KvStoreConfigPrefix)
 	if err != nil {
 		return nil, err
-	// Looping through the data recieved from the backend for config
+	// Looping through the data recieved from the Backend for config
 	// Trimming and Splitting the required key and value from data and  storing as componentName,PackageName and Level
 	// For Example, recieved key would be <Backend Prefix Path>/<Config Prefix>/<Component Name>/<Config Type>/default and value \"DEBUG\"
 	// Then in default will be stored as PackageName,componentName as <Component Name> and DEBUG will be stored as value in List struct
@@ -168,14 +168,14 @@
 	c.changeEventChan = make(chan *ConfigChangeEvent, 1)
-	c.kvStoreEventChan = c.cManager.backend.CreateWatch(ctx, key, true)
+	c.kvStoreEventChan = c.cManager.Backend.CreateWatch(ctx, key, true)
 	go c.processKVStoreWatchEvents()
 	return c.changeEventChan
-// processKVStoreWatchEvents process event channel recieved from the backend for any ChangeType
+// processKVStoreWatchEvents process event channel recieved from the Backend for any ChangeType
 // It checks for the EventType is valid or not.For the valid EventTypes creates ConfigChangeEvent and send it on channel
 func (c *ComponentConfig) processKVStoreWatchEvents() {
@@ -183,7 +183,7 @@
 	logger.Debugw("processing-kvstore-event-change", log.Fields{"key-prefix": ccKeyPrefix})
-	ccPathPrefix := c.cManager.backend.PathPrefix + ccKeyPrefix + kvStorePathSeparator
+	ccPathPrefix := c.cManager.Backend.PathPrefix + ccKeyPrefix + kvStorePathSeparator
 	for watchResp := range c.kvStoreEventChan {
@@ -210,7 +210,7 @@
 	logger.Debugw("retrieving-config", log.Fields{"key": key})
-	if kvpair, err := c.cManager.backend.Get(ctx, key); err != nil {
+	if kvpair, err := c.cManager.Backend.Get(ctx, key); err != nil {
 		return "", err
 	} else {
 		if kvpair == nil {
@@ -228,17 +228,17 @@
 	logger.Debugw("retreiving-list", log.Fields{"key": key})
-	data, err := c.cManager.backend.List(ctx, key)
+	data, err := c.cManager.Backend.List(ctx, key)
 	if err != nil {
 		return nil, err
-	// Looping through the data recieved from the backend for the given key
+	// Looping through the data recieved from the Backend for the given key
 	// Trimming the required key and value from data and  storing as key/value pair
 	// For Example, recieved key would be <Backend Prefix Path>/<Config Prefix>/<Component Name>/<Config Type>/default and value \"DEBUG\"
 	// Then in default will be stored as key and DEBUG will be stored as value in map[string]string
 	res := make(map[string]string)
-	ccPathPrefix := c.cManager.backend.PathPrefix + kvStorePathSeparator + key + kvStorePathSeparator
+	ccPathPrefix := c.cManager.Backend.PathPrefix + kvStorePathSeparator + key + kvStorePathSeparator
 	for attr, val := range data {
 		res[strings.TrimPrefix(attr, ccPathPrefix)] = strings.Trim(fmt.Sprintf("%s", val.Value), "\"")
@@ -252,7 +252,7 @@
 	logger.Debugw("saving-config", log.Fields{"key": key, "value": configValue})
 	//save the data for update config
-	if err := c.cManager.backend.Put(ctx, key, configValue); err != nil {
+	if err := c.cManager.Backend.Put(ctx, key, configValue); err != nil {
 		return err
 	return nil
@@ -264,7 +264,7 @@
 	logger.Debugw("deleting-config", log.Fields{"key": key})
 	//delete the config
-	if err := c.cManager.backend.Delete(ctx, key); err != nil {
+	if err := c.cManager.Backend.Delete(ctx, key); err != nil {
 		return err
 	return nil
 // Package Config provides dynamic logging configuration for specific Voltha component with loglevel lookup
-// from etcd kvstore implemented using backend.
+// from etcd kvstore implemented using Backend.
 // Any Voltha component can start utilizing dynamic logging by starting goroutine of StartLogLevelConfigProcessing after
 // starting kvClient for the component.
@@ -121,8 +121,8 @@
 // ProcessLogConfig will first load and apply log config and then start waiting on component config and global config
-// channels for any changes. Event channel will be recieved from backend for valid change type
-// Then data for componentn log config and global log config will be retrieved from backend and stored in updatedLogConfig in precedence order
+// channels for any changes. Event channel will be recieved from Backend for valid change type
+// Then data for componentn log config and global log config will be retrieved from Backend and stored in updatedLogConfig in precedence order
 // If any changes in updatedLogConfig will be applied on component
 func (c *ComponentLogController) processLogConfig(ctx context.Context) {
@@ -247,7 +247,7 @@
 	return componentLogConfig, nil
-// buildUpdatedLogConfig retrieve the global logConfig and component logConfig  from backend
+// buildUpdatedLogConfig retrieve the global logConfig and component logConfig  from Backend
 // component logConfig stores the log config with precedence order
 // For example, If the global logConfig is set and component logConfig is set only for specific package then
 // component logConfig is stored with global logConfig  and component logConfig of specific package
+ * Copyright 2020-present Open Networking Foundation
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+import (
+	"context"
+	"fmt"
+	"github.com/buraksezer/consistent"
+	"github.com/cespare/xxhash"
+	"github.com/golang/protobuf/proto"
+	"github.com/opencord/voltha-lib-go/v3/pkg/db"
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
+	"github.com/opencord/voltha-protos/v3/go/voltha"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+	"sync"
+const (
+	// All the values below can be tuned to get optimal data distribution.  The numbers below seems to work well when
+	// supporting 1000-10000 devices and 1 - 20 replicas of a service
+	// Keys are distributed among partitions. Prime numbers are good to distribute keys uniformly.
+	DefaultPartitionCount = 1117
+	// Represents how many times a node is replicated on the consistent ring.
+	DefaultReplicationFactor = 117
+	// Load is used to calculate average load.
+	DefaultLoad = 1.1
+type Endpoint string // Endpoint of a service instance.  When using kafka, this is the topic name of a service
+type ReplicaID int32 // The replication ID of a service instance
+type EndpointManager interface {
+	// GetEndpoint is called to get the endpoint to communicate with for a specific device and service type.  For
+	// now this will return the topic name
+	GetEndpoint(deviceID string, serviceType string) (Endpoint, error)
+	// IsDeviceOwnedByService is invoked when a specific service (service type + replicaNumber) is restarted and
+	// devices owned by that service need to be reconciled
+	IsDeviceOwnedByService(deviceID string, serviceType string, replicaNumber int32) (bool, error)
+	// getReplicaAssignment returns the replica number of the service that owns the deviceID.  This is used by the
+	// test only
+	getReplicaAssignment(deviceID string, serviceType string) (ReplicaID, error)
+type service struct {
+	id             string // Id of the service.  The same id is used for all replicas
+	totalReplicas  int32
+	replicas       map[ReplicaID]Endpoint
+	consistentRing *consistent.Consistent
+type endpointManager struct {
+	partitionCount           int
+	replicationFactor        int
+	load                     float64
+	backend                  *db.Backend
+	services                 map[string]*service
+	servicesLock             sync.RWMutex
+	deviceTypeServiceMap     map[string]string
+	deviceTypeServiceMapLock sync.RWMutex
+type EndpointManagerOption func(*endpointManager)
+func PartitionCount(count int) EndpointManagerOption {
+	return func(args *endpointManager) {
+		args.partitionCount = count
+	}
+func ReplicationFactor(replicas int) EndpointManagerOption {
+	return func(args *endpointManager) {
+		args.replicationFactor = replicas
+	}
+func Load(load float64) EndpointManagerOption {
+	return func(args *endpointManager) {
+		args.load = load
+	}
+func newEndpointManager(backend *db.Backend, opts ...EndpointManagerOption) EndpointManager {
+	tm := &endpointManager{
+		partitionCount:       DefaultPartitionCount,
+		replicationFactor:    DefaultReplicationFactor,
+		load:                 DefaultLoad,
+		backend:              backend,
+		services:             make(map[string]*service),
+		deviceTypeServiceMap: make(map[string]string),
+	}
+	for _, option := range opts {
+		option(tm)
+	}
+	return tm
+func NewEndpointManager(backend *db.Backend, opts ...EndpointManagerOption) EndpointManager {
+	return newEndpointManager(backend, opts...)
+func (ep *endpointManager) GetEndpoint(deviceID string, serviceType string) (Endpoint, error) {
+	logger.Debugw("getting-endpoint", log.Fields{"device-id": deviceID, "service": serviceType})
+	owner, err := ep.getOwner(deviceID, serviceType)
+	if err != nil {
+		return "", err
+	}
+	m, ok := owner.(Member)
+	if !ok {
+		return "", status.Errorf(codes.Aborted, "invalid-member-%v", owner)
+	}
+	endpoint := m.getEndPoint()
+	if endpoint == "" {
+		return "", status.Errorf(codes.Unavailable, "endpoint-not-set-%s", serviceType)
+	}
+	logger.Debugw("returning-endpoint", log.Fields{"device-id": deviceID, "service": serviceType, "endpoint": endpoint})
+	return endpoint, nil
+func (ep *endpointManager) IsDeviceOwnedByService(deviceID string, serviceType string, replicaNumber int32) (bool, error) {
+	logger.Debugw("device-ownership", log.Fields{"device-id": deviceID, "service": serviceType, "replica-number": replicaNumber})
+	owner, err := ep.getOwner(deviceID, serviceType)
+	if err != nil {
+		return false, nil
+	}
+	m, ok := owner.(Member)
+	if !ok {
+		return false, status.Errorf(codes.Aborted, "invalid-member-%v", owner)
+	}
+	return m.getReplica() == ReplicaID(replicaNumber), nil
+func (ep *endpointManager) getReplicaAssignment(deviceID string, serviceType string) (ReplicaID, error) {
+	owner, err := ep.getOwner(deviceID, serviceType)
+	if err != nil {
+		return 0, nil
+	}
+	m, ok := owner.(Member)
+	if !ok {
+		return 0, status.Errorf(codes.Aborted, "invalid-member-%v", owner)
+	}
+	return m.getReplica(), nil
+func (ep *endpointManager) getOwner(deviceID string, serviceType string) (consistent.Member, error) {
+	serv, dType, err := ep.getServiceAndDeviceType(serviceType)
+	if err != nil {
+		return nil, err
+	}
+	key := ep.makeKey(deviceID, dType, serviceType)
+	return serv.consistentRing.LocateKey(key), nil
+func (ep *endpointManager) getServiceAndDeviceType(serviceType string) (*service, string, error) {
+	// Check whether service exist
+	ep.servicesLock.RLock()
+	serv, serviceExist := ep.services[serviceType]
+	ep.servicesLock.RUnlock()
+	// Load the service and device types if needed
+	if !serviceExist || serv == nil || int(serv.totalReplicas) != len(serv.consistentRing.GetMembers()) {
+		if err := ep.loadServices(); err != nil {
+			return nil, "", err
+		}
+		// Check whether the service exists now
+		ep.servicesLock.RLock()
+		serv, serviceExist = ep.services[serviceType]
+		ep.servicesLock.RUnlock()
+		if !serviceExist || serv == nil || int(serv.totalReplicas) != len(serv.consistentRing.GetMembers()) {
+			return nil, "", status.Errorf(codes.NotFound, "service-%s", serviceType)
+		}
+	}
+	ep.deviceTypeServiceMapLock.RLock()
+	defer ep.deviceTypeServiceMapLock.RUnlock()
+	for dType, sType := range ep.deviceTypeServiceMap {
+		if sType == serviceType {
+			return serv, dType, nil
+		}
+	}
+	return nil, "", status.Errorf(codes.NotFound, "service-%s", serviceType)
+func (ep *endpointManager) getConsistentConfig() consistent.Config {
+	return consistent.Config{
+		PartitionCount:    ep.partitionCount,
+		ReplicationFactor: ep.replicationFactor,
+		Load:              ep.load,
+		Hasher:            hasher{},
+	}
+// loadServices loads the services (adapters) and device types in memory. Because of the small size of the data and
+// the data format in the dB being binary protobuf then it is better to load all the data if inconsistency is detected,
+// instead of watching for updates in the dB and acting on it.
+func (ep *endpointManager) loadServices() error {
+	ep.servicesLock.Lock()
+	defer ep.servicesLock.Unlock()
+	ep.deviceTypeServiceMapLock.Lock()
+	defer ep.deviceTypeServiceMapLock.Unlock()
+	if ep.backend == nil {
+		return status.Error(codes.Aborted, "backend-not-set")
+	}
+	ep.services = make(map[string]*service)
+	ep.deviceTypeServiceMap = make(map[string]string)
+	// Load the adapters
+	blobs, err := ep.backend.List(context.Background(), "adapters")
+	if err != nil {
+		return err
+	}
+	// Data is marshalled as proto bytes in the data store
+	for _, blob := range blobs {
+		data := blob.Value.([]byte)
+		adapter := &voltha.Adapter{}
+		if err := proto.Unmarshal(data, adapter); err != nil {
+			return err
+		}
+		// A valid adapter should have the vendorID set
+		if adapter.Vendor != "" {
+			if _, ok := ep.services[adapter.Type]; !ok {
+				ep.services[adapter.Type] = &service{
+					id:             adapter.Type,
+					totalReplicas:  adapter.TotalReplicas,
+					replicas:       make(map[ReplicaID]Endpoint),
+					consistentRing: consistent.New(nil, ep.getConsistentConfig()),
+				}
+			}
+			currentReplica := ReplicaID(adapter.CurrentReplica)
+			endpoint := Endpoint(adapter.Endpoint)
+			ep.services[adapter.Type].replicas[currentReplica] = endpoint
+			ep.services[adapter.Type].consistentRing.Add(newMember(adapter.Id, adapter.Type, adapter.Vendor, endpoint, adapter.Version, currentReplica))
+		}
+	}
+	// Load the device types
+	blobs, err = ep.backend.List(context.Background(), "device_types")
+	if err != nil {
+		return err
+	}
+	for _, blob := range blobs {
+		data := blob.Value.([]byte)
+		deviceType := &voltha.DeviceType{}
+		if err := proto.Unmarshal(data, deviceType); err != nil {
+			return err
+		}
+		if _, ok := ep.deviceTypeServiceMap[deviceType.Id]; !ok {
+			ep.deviceTypeServiceMap[deviceType.Id] = deviceType.Adapter
+		}
+	}
+	// Log the loaded data in debug mode to facilitate trouble shooting
+	if logger.V(log.DebugLevel) {
+		for key, val := range ep.services {
+			members := val.consistentRing.GetMembers()
+			logger.Debugw("service", log.Fields{"service": key, "expected-replica": val.totalReplicas, "replicas": len(val.consistentRing.GetMembers())})
+			for _, m := range members {
+				n := m.(Member)
+				logger.Debugw("service-loaded", log.Fields{"serviceId": n.getID(), "serviceType": n.getServiceType(), "replica": n.getReplica(), "endpoint": n.getEndPoint()})
+			}
+		}
+		logger.Debugw("device-types-loaded", log.Fields{"device-types": ep.deviceTypeServiceMap})
+	}
+	return nil
+// makeKey creates the string that the hash function uses to create the hash
+func (ep *endpointManager) makeKey(deviceID string, deviceType string, serviceType string) []byte {
+	return []byte(fmt.Sprintf("%s_%s_%s", serviceType, deviceType, deviceID))
+// The consistent package requires a hasher function
+type hasher struct{}
+// Sum64 provides the hasher function.  Based upon numerous testing scenarios, the xxhash package seems to provide the
+// best distribution compare to other hash packages
+func (h hasher) Sum64(data []byte) uint64 {
+	return xxhash.Sum64(data)
+// Member represents a member on the consistent ring
+type Member interface {
+	String() string
+	getReplica() ReplicaID
+	getEndPoint() Endpoint
+	getID() string
+	getServiceType() string
+// member implements the Member interface
+type member struct {
+	id          string
+	serviceType string
+	vendor      string
+	version     string
+	replica     ReplicaID
+	endpoint    Endpoint
+func newMember(ID string, serviceType string, vendor string, endPoint Endpoint, version string, replica ReplicaID) Member {
+	return &member{
+		id:          ID,
+		serviceType: serviceType,
+		vendor:      vendor,
+		version:     version,
+		replica:     replica,
+		endpoint:    endPoint,
+	}
+func (m *member) String() string {
+	return string(m.endpoint)
+func (m *member) getReplica() ReplicaID {
+	return m.replica
+func (m *member) getEndPoint() Endpoint {
+	return m.endpoint
+func (m *member) getID() string {
+	return m.id
+func (m *member) getServiceType() string {
+	return m.serviceType
diff --git a/vendor/github.com/opencord/voltha-protos/v3/go/voltha/adapter.pb.go b/vendor/github.com/opencord/voltha-protos/v3/go/voltha/adapter.pb.go
 // Adapter (software plugin)
 type Adapter struct {
-	// Unique name of adapter, matching the python package name under
-	// voltha/adapters.
+	// the adapter ID has to be unique,
+	// it will be generated as Type + CurrentReplica
 	Id      string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
 	Vendor  string `protobuf:"bytes,2,opt,name=vendor,proto3" json:"vendor,omitempty"`
 	Version string `protobuf:"bytes,3,opt,name=version,proto3" json:"version,omitempty"`
@@ -76,10 +76,16 @@
 	AdditionalDescription *any.Any `protobuf:"bytes,64,opt,name=additional_description,json=additionalDescription,proto3" json:"additional_description,omitempty"`
 	LogicalDeviceIds      []string `protobuf:"bytes,4,rep,name=logical_device_ids,json=logicalDeviceIds,proto3" json:"logical_device_ids,omitempty"`
 	// timestamp when the adapter last sent a message to the core
-	LastCommunication    *timestamp.Timestamp `protobuf:"bytes,5,opt,name=last_communication,json=lastCommunication,proto3" json:"last_communication,omitempty"`
-	XXX_NoUnkeyedLiteral struct{}             `json:"-"`
-	XXX_unrecognized     []byte               `json:"-"`
-	XXX_sizecache        int32                `json:"-"`
+	LastCommunication *timestamp.Timestamp `protobuf:"bytes,5,opt,name=last_communication,json=lastCommunication,proto3" json:"last_communication,omitempty"`
+	CurrentReplica    int32                `protobuf:"varint,6,opt,name=currentReplica,proto3" json:"currentReplica,omitempty"`
+	TotalReplicas     int32                `protobuf:"varint,7,opt,name=totalReplicas,proto3" json:"totalReplicas,omitempty"`
+	Endpoint          string               `protobuf:"bytes,8,opt,name=endpoint,proto3" json:"endpoint,omitempty"`
+	// all replicas of the same adapter will have the same type
+	// it is used to associate a device to an adapter
+	Type                 string   `protobuf:"bytes,9,opt,name=type,proto3" json:"type,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
 func (m *Adapter) Reset()         { *m = Adapter{} }
@@ -156,6 +162,34 @@
 	return nil
+func (m *Adapter) GetCurrentReplica() int32 {
+	if m != nil {
+		return m.CurrentReplica
+	}
+	return 0
+func (m *Adapter) GetTotalReplicas() int32 {
+	if m != nil {
+		return m.TotalReplicas
+	}
+	return 0
+func (m *Adapter) GetEndpoint() string {
+	if m != nil {
+		return m.Endpoint
+	}
+	return ""
+func (m *Adapter) GetType() string {
+	if m != nil {
+		return m.Type
+	}
+	return ""
 type Adapters struct {
 	Items                []*Adapter `protobuf:"bytes,1,rep,name=items,proto3" json:"items,omitempty"`
 	XXX_NoUnkeyedLiteral struct{}   `json:"-"`
@@ -204,31 +238,34 @@
 func init() { proto.RegisterFile("voltha_protos/adapter.proto", fileDescriptor_7e998ce153307274) }
 var fileDescriptor_7e998ce153307274 = []byte{
-	// 405 bytes of a gzipped FileDescriptorProto
-	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x92, 0xd1, 0x6a, 0xdb, 0x30,
-	0x14, 0x86, 0x71, 0xb2, 0xb8, 0xab, 0x4a, 0x59, 0xaa, 0x2d, 0xc3, 0xf3, 0x28, 0x35, 0x81, 0x81,
-	0x2f, 0x56, 0x99, 0xb5, 0x2f, 0xb0, 0xa4, 0xbd, 0xe9, 0xad, 0x28, 0xbb, 0xd8, 0x8d, 0x51, 0x24,
-	0xd5, 0x15, 0xd8, 0x3a, 0xc6, 0x52, 0x0c, 0x7d, 0xc8, 0xbd, 0xc1, 0x1e, 0x60, 0x4f, 0xb0, 0xeb,
-	0x11, 0x49, 0x26, 0x4e, 0x06, 0xbd, 0x32, 0xfa, 0xbf, 0xff, 0xfc, 0xe7, 0x1c, 0xc9, 0xe8, 0x73,
-	0x0f, 0xb5, 0x7d, 0x66, 0x65, 0xdb, 0x81, 0x05, 0x53, 0x30, 0xc1, 0x5a, 0x2b, 0x3b, 0xe2, 0x8e,
-	0x38, 0xf6, 0x30, 0xfd, 0x54, 0x01, 0x54, 0xb5, 0x2c, 0x9c, 0xba, 0xd9, 0x3e, 0x15, 0x4c, 0xbf,
-	0x78, 0x4b, 0x9a, 0x1e, 0xd6, 0x73, 0x68, 0x1a, 0xd0, 0x81, 0x25, 0x87, 0xac, 0x91, 0x96, 0x05,
-	0x72, 0x75, 0x1c, 0x68, 0x55, 0x23, 0x8d, 0x65, 0x4d, 0xeb, 0x0d, 0x4b, 0x8a, 0xce, 0x57, 0x7e,
-	0x94, 0x3b, 0xd0, 0x4f, 0xaa, 0xc2, 0x2b, 0x74, 0xc1, 0x84, 0x50, 0x56, 0x81, 0x66, 0x75, 0xc9,
-	0x9d, 0x98, 0x7c, 0xcf, 0xa2, 0xfc, 0xec, 0xe6, 0x03, 0xf1, 0x69, 0x64, 0x48, 0x23, 0x2b, 0xfd,
-	0x42, 0xe7, 0x7b, 0xbb, 0x8f, 0x58, 0xfe, 0x9e, 0xa0, 0x93, 0x10, 0x8a, 0x17, 0x68, 0xa2, 0x44,
-	0x12, 0x65, 0x51, 0x7e, 0xba, 0x9e, 0xfd, 0xf9, 0xfb, 0xeb, 0x32, 0xa2, 0x13, 0x25, 0xf0, 0x25,
-	0x8a, 0x7b, 0xa9, 0x05, 0x74, 0xc9, 0x64, 0x8c, 0x82, 0x88, 0xaf, 0xd0, 0x49, 0x2f, 0x3b, 0xa3,
-	0x40, 0x27, 0xd3, 0x31, 0x1f, 0x54, 0x7c, 0x8d, 0xe2, 0x30, 0xda, 0xdc, 0x8d, 0xb6, 0x20, 0xfe,
-	0x0a, 0xc8, 0xc1, 0x32, 0x34, 0x98, 0x30, 0x45, 0x1f, 0x47, 0x4b, 0x09, 0x69, 0x78, 0xa7, 0xda,
-	0xdd, 0xe9, 0xb5, 0xcd, 0x86, 0xa6, 0x8b, 0x7d, 0xe9, 0xfd, 0xbe, 0x12, 0x7f, 0x45, 0xb8, 0x86,
-	0x4a, 0x71, 0x17, 0xd8, 0x2b, 0x2e, 0x4b, 0x25, 0x4c, 0xf2, 0x26, 0x9b, 0xe6, 0xa7, 0x74, 0x1e,
-	0xc8, 0xbd, 0x03, 0x0f, 0xc2, 0xe0, 0x07, 0x84, 0x6b, 0x66, 0x6c, 0xb9, 0x7b, 0xb7, 0xad, 0x56,
-	0x9c, 0xb9, 0xee, 0x33, 0xd7, 0x3d, 0xfd, 0xaf, 0xfb, 0xe3, 0xf0, 0x4a, 0xf4, 0x62, 0x57, 0x75,
-	0x37, 0x2e, 0x5a, 0x7e, 0x43, 0x6f, 0xc3, 0x96, 0x06, 0x7f, 0x41, 0x33, 0x65, 0x65, 0x63, 0x92,
-	0x28, 0x9b, 0xe6, 0x67, 0x37, 0xef, 0x8e, 0xae, 0x81, 0x7a, 0xba, 0x7e, 0x44, 0xef, 0xa1, 0xab,
-	0x08, 0xb4, 0x52, 0x73, 0xe8, 0x44, 0x70, 0xad, 0xcf, 0x7f, 0xb8, 0x6f, 0x30, 0xff, 0x24, 0x95,
-	0xb2, 0xcf, 0xdb, 0x0d, 0xe1, 0xd0, 0x14, 0x83, 0xb5, 0xf0, 0xd6, 0xeb, 0xf0, 0x6b, 0xf5, 0xb7,
-	0x45, 0x05, 0x41, 0xdb, 0xc4, 0x4e, 0xbc, 0xfd, 0x17, 0x00, 0x00, 0xff, 0xff, 0x4e, 0xc5, 0xdf,
-	0x09, 0xdb, 0x02, 0x00, 0x00,
+	// 463 bytes of a gzipped FileDescriptorProto
+	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x52, 0xdd, 0x6e, 0xd3, 0x30,
+	0x14, 0x56, 0xda, 0x35, 0x6d, 0x3d, 0x15, 0x3a, 0x43, 0x91, 0x09, 0x9a, 0x16, 0x55, 0x80, 0x72,
+	0xc1, 0x12, 0xb1, 0xbd, 0x00, 0xed, 0x76, 0xb3, 0x5b, 0x6b, 0xe2, 0x82, 0x9b, 0xca, 0xb5, 0xbd,
+	0xcc, 0x52, 0xe2, 0x13, 0xc5, 0x6e, 0xa4, 0x3e, 0x24, 0x2f, 0xc0, 0x13, 0xf0, 0x04, 0x5c, 0xa3,
+	0x3a, 0x0e, 0xfd, 0x41, 0xda, 0x55, 0x72, 0xbe, 0xef, 0x3b, 0xdf, 0xf9, 0x33, 0xfa, 0xd0, 0x40,
+	0x61, 0x9f, 0xd9, 0xaa, 0xaa, 0xc1, 0x82, 0xc9, 0x98, 0x60, 0x95, 0x95, 0x75, 0xea, 0x42, 0x1c,
+	0xb6, 0x64, 0xf4, 0x3e, 0x07, 0xc8, 0x0b, 0x99, 0x39, 0x74, 0xbd, 0x79, 0xca, 0x98, 0xde, 0xb6,
+	0x92, 0x28, 0x3a, 0xce, 0xe7, 0x50, 0x96, 0xa0, 0x3d, 0x47, 0x8e, 0xb9, 0x52, 0x5a, 0xe6, 0x99,
+	0xab, 0x53, 0x43, 0xab, 0x4a, 0x69, 0x2c, 0x2b, 0xab, 0x56, 0x30, 0xa7, 0x68, 0xb2, 0x68, 0x5b,
+	0xb9, 0x03, 0xfd, 0xa4, 0x72, 0xbc, 0x40, 0x17, 0x4c, 0x08, 0x65, 0x15, 0x68, 0x56, 0xac, 0xb8,
+	0x03, 0xc9, 0xb7, 0x38, 0x48, 0xce, 0x6f, 0xde, 0xa6, 0xad, 0x5b, 0xda, 0xb9, 0xa5, 0x0b, 0xbd,
+	0xa5, 0xd3, 0xbd, 0xbc, 0xb5, 0x98, 0xff, 0xea, 0xa3, 0xa1, 0x37, 0xc5, 0x33, 0xd4, 0x53, 0x82,
+	0x04, 0x71, 0x90, 0x8c, 0x97, 0x83, 0xdf, 0x7f, 0x7e, 0x5e, 0x06, 0xb4, 0xa7, 0x04, 0xbe, 0x44,
+	0x61, 0x23, 0xb5, 0x80, 0x9a, 0xf4, 0x0e, 0x29, 0x0f, 0xe2, 0x2b, 0x34, 0x6c, 0x64, 0x6d, 0x14,
+	0x68, 0xd2, 0x3f, 0xe4, 0x3b, 0x14, 0x5f, 0xa3, 0xd0, 0xb7, 0x36, 0x75, 0xad, 0xcd, 0xd2, 0x76,
+	0x05, 0xe9, 0xd1, 0x30, 0xd4, 0x8b, 0x30, 0x45, 0xef, 0x0e, 0x86, 0x12, 0xd2, 0xf0, 0x5a, 0x55,
+	0xbb, 0xe8, 0xa5, 0xc9, 0xba, 0xa2, 0xb3, 0x7d, 0xea, 0xfd, 0x3e, 0x13, 0x7f, 0x41, 0xb8, 0x80,
+	0x5c, 0x71, 0x67, 0xd8, 0x28, 0x2e, 0x57, 0x4a, 0x18, 0x72, 0x16, 0xf7, 0x93, 0x31, 0x9d, 0x7a,
+	0xe6, 0xde, 0x11, 0x0f, 0xc2, 0xe0, 0x07, 0x84, 0x0b, 0x66, 0xec, 0x6a, 0x77, 0xb7, 0x8d, 0x56,
+	0x9c, 0xb9, 0xea, 0x03, 0x57, 0x3d, 0xfa, 0xaf, 0xfa, 0x63, 0x77, 0x25, 0x7a, 0xb1, 0xcb, 0xba,
+	0x3b, 0x4c, 0xc2, 0x9f, 0xd1, 0x2b, 0xbe, 0xa9, 0x6b, 0xa9, 0x2d, 0x95, 0x55, 0xa1, 0x38, 0x23,
+	0x61, 0x1c, 0x24, 0x03, 0x7a, 0x82, 0xe2, 0x8f, 0x68, 0x62, 0xc1, 0xb2, 0xc2, 0xc7, 0x86, 0x0c,
+	0x9d, 0xec, 0x18, 0xc4, 0x11, 0x1a, 0x49, 0x2d, 0x2a, 0x50, 0xda, 0x92, 0xd1, 0x6e, 0xd7, 0xf4,
+	0x5f, 0x8c, 0x31, 0x3a, 0xb3, 0xdb, 0x4a, 0x92, 0xb1, 0xc3, 0xdd, 0xff, 0xfc, 0x2b, 0x1a, 0xf9,
+	0x1d, 0x1b, 0xfc, 0x09, 0x0d, 0x94, 0x95, 0xa5, 0x21, 0x41, 0xdc, 0x4f, 0xce, 0x6f, 0x5e, 0x9f,
+	0x1c, 0x81, 0xb6, 0xec, 0xf2, 0x11, 0xbd, 0x81, 0x3a, 0x4f, 0xa1, 0x92, 0x9a, 0x43, 0x2d, 0xbc,
+	0x6a, 0x39, 0xf9, 0xee, 0xbe, 0x5e, 0xfc, 0x23, 0xcd, 0x95, 0x7d, 0xde, 0xac, 0x53, 0x0e, 0x65,
+	0xd6, 0x49, 0xb3, 0x56, 0x7a, 0xed, 0x1f, 0x76, 0x73, 0x9b, 0xe5, 0xe0, 0xb1, 0x75, 0xe8, 0xc0,
+	0xdb, 0xbf, 0x01, 0x00, 0x00, 0xff, 0xff, 0xef, 0x64, 0x5e, 0x10, 0x59, 0x03, 0x00, 0x00,
 # github.com/bsm/sarama-cluster v2.1.15+incompatible
+# github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72
 # github.com/cenkalti/backoff/v3 v3.1.1
+# github.com/cespare/xxhash v1.1.0
 # github.com/cevaris/ordered_map v0.0.0-20190319150403-3adeae072e73
 # github.com/coreos/go-systemd v0.0.0-20190620071333-e64a0ec8b42a
@@ -63,7 +67,7 @@
 # github.com/mitchellh/mapstructure v1.1.2
-# github.com/opencord/voltha-lib-go/v3 v3.1.0
+# github.com/opencord/voltha-lib-go/v3 v3.1.1
@@ -78,7 +82,7 @@
-# github.com/opencord/voltha-protos/v3 v3.2.8
+# github.com/opencord/voltha-protos/v3 v3.3.0