[VOL-2836] Using different topic per ONU device
Change-Id: Ic0328f789c19dfedd86616df527c5ba510de0de9
diff --git a/VERSION b/VERSION
index dfb9edc..005119b 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-2.4.1-dev
+2.4.1
diff --git a/cmd/openolt-adapter/main.go b/cmd/openolt-adapter/main.go
index a31b520..a0c4c50 100644
--- a/cmd/openolt-adapter/main.go
+++ b/cmd/openolt-adapter/main.go
@@ -109,6 +109,8 @@
p.UpdateStatus("message-bus", probe.ServiceStatusRunning)
}
+ // setup endpointManager
+
// Start the common InterContainer Proxy - retries indefinitely
if a.kip, err = a.startInterContainerProxy(ctx, -1); err != nil {
logger.Fatal("error-starting-inter-container-proxy")
@@ -118,7 +120,7 @@
a.coreProxy = com.NewCoreProxy(a.kip, a.config.Topic, a.config.CoreTopic)
// Create the adaptor proxy to handle request between olt and onu
- a.adapterProxy = com.NewAdapterProxy(a.kip, "brcm_openomci_onu", a.config.CoreTopic)
+ a.adapterProxy = com.NewAdapterProxy(a.kip, "brcm_openomci_onu", a.config.CoreTopic, cm.Backend)
// Create the event proxy to post events to KAFKA
a.eventProxy = com.NewEventProxy(com.MsgClient(a.kafkaClient), com.MsgTopic(kafka.Topic{Name: a.config.EventTopic}))
@@ -371,12 +373,26 @@
}
func (a *adapter) registerWithCore(ctx context.Context, retries int) error {
- logger.Info("registering-with-core")
- adapterDescription := &voltha.Adapter{Id: "openolt", // Unique name for the device type
+ adapterID := fmt.Sprintf("openolt_%d", a.config.CurrentReplica)
+ logger.Infow("registering-with-core", log.Fields{
+ "adapterID": adapterID,
+ "currentReplica": a.config.CurrentReplica,
+ "totalReplicas": a.config.TotalReplicas,
+ })
+ adapterDescription := &voltha.Adapter{
+ Id: adapterID, // Unique name for the device type
Vendor: "VOLTHA OpenOLT",
- Version: version.VersionInfo.Version}
- types := []*voltha.DeviceType{{Id: "openolt",
- Adapter: "openolt", // Name of the adapter that handles device type
+ Version: version.VersionInfo.Version,
+ // TODO once we'll be ready to support multiple versions of the OpenOLT adapter
+ // the Endpoint will have to change to `openolt_<currentReplica`>
+ Endpoint: "openolt",
+ Type: "openolt",
+ CurrentReplica: int32(a.config.CurrentReplica),
+ TotalReplicas: int32(a.config.TotalReplicas),
+ }
+ types := []*voltha.DeviceType{{
+ Id: "openolt",
+ Adapter: "openolt", // Type of the adapter that handles device type
AcceptsBulkFlowUpdate: false, // Currently openolt adapter does not support bulk flow handling
AcceptsAddRemoveFlowUpdates: true}}
deviceTypes := &voltha.DeviceTypes{Items: types}
diff --git a/go.mod b/go.mod
index ce7ccd5..38635fe 100644
--- a/go.mod
+++ b/go.mod
@@ -7,8 +7,8 @@
github.com/cenkalti/backoff/v3 v3.1.1
github.com/gogo/protobuf v1.3.1
github.com/golang/protobuf v1.3.2
- github.com/opencord/voltha-lib-go/v3 v3.1.0
- github.com/opencord/voltha-protos/v3 v3.2.8
+ github.com/opencord/voltha-lib-go/v3 v3.1.1
+ github.com/opencord/voltha-protos/v3 v3.3.0
go.etcd.io/etcd v0.0.0-20190930204107-236ac2a90522
google.golang.org/grpc v1.25.1
)
diff --git a/go.sum b/go.sum
index 417322d..e245104 100644
--- a/go.sum
+++ b/go.sum
@@ -6,6 +6,8 @@
github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/EagleChen/mapmutex v0.0.0-20180418073615-e1a5ae258d8d h1:j5hduAppx4gHqltfZ1cm7jHbXR0LuQulnF4VkBU8esw=
github.com/EagleChen/mapmutex v0.0.0-20180418073615-e1a5ae258d8d/go.mod h1:H87WPRkM4YDLkW5tC6biLEzWaKtNse5xL1AR91FXC74=
+github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
+github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/Shopify/sarama v1.23.1 h1:XxJBCZEoWJtoWjf/xRbmGUpAmTZGnuuF0ON0EvxxBrs=
github.com/Shopify/sarama v1.23.1/go.mod h1:XLH1GYJnLVE0XCr6KdJGVJRTwY30moWNJ4sERjXX6fs=
github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc=
@@ -25,9 +27,13 @@
github.com/boljen/go-bitmap v0.0.0-20151001105940-23cd2fb0ce7d/go.mod h1:f1iKL6ZhUWvbk7PdWVmOaak10o86cqMUYEmn1CZNGEI=
github.com/bsm/sarama-cluster v2.1.15+incompatible h1:RkV6WiNRnqEEbp81druK8zYhmnIgdOjqSVi0+9Cnl2A=
github.com/bsm/sarama-cluster v2.1.15+incompatible/go.mod h1:r7ao+4tTNXvWm+VRpRJchr2kQhqxgmAp2iEX5W96gMM=
+github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72 h1:fUmDBbSvv1uOzo/t8WaxZMVb7BxJ8JECo5lGoR9c5bA=
+github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72/go.mod h1:OEE5igu/CDjGegM1Jn6ZMo7R6LlV/JChAkjfQQIRLpg=
github.com/cenkalti/backoff/v3 v3.1.1 h1:UBHElAnr3ODEbpqPzX8g5sBcASjoLFtt3L/xwJ01L6E=
github.com/cenkalti/backoff/v3 v3.1.1/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
+github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
+github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cevaris/ordered_map v0.0.0-20190319150403-3adeae072e73 h1:q1g9lSyo/nOIC3W5E3FK3Unrz8b9LdLXCyuC+ZcpPC0=
github.com/cevaris/ordered_map v0.0.0-20190319150403-3adeae072e73/go.mod h1:507vXsotcZop7NZfBWdhPmVeOse4ko2R7AagJYrpoEg=
github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag=
@@ -196,10 +202,10 @@
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.2 h1:3mYCb7aPxS/RU7TI1y4rkEn1oKmPRjNJLNEXgw7MH2I=
github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
-github.com/opencord/voltha-lib-go/v3 v3.1.0 h1:3KPBQT2s5HdCo6bwWHpNFeR8KYbDQMmZEq+aHim6U/o=
-github.com/opencord/voltha-lib-go/v3 v3.1.0/go.mod h1:iOFpFIonmGq6nYG4qYWrF5cKyt7UupDLyB4iGXFDRig=
-github.com/opencord/voltha-protos/v3 v3.2.8 h1:FuMEmUsW4WRpIsNCcELM9QBZZwBLRvuw85DVvfpZ1do=
-github.com/opencord/voltha-protos/v3 v3.2.8/go.mod h1:nl1ETp5Iw3avxOaKD8BJlYY5wYI4KeV95aT1pL63nto=
+github.com/opencord/voltha-lib-go/v3 v3.1.1 h1:mAO+fbkj/Vb8CYOEDSwqfC+/K0eE4kMsNPVdWHZO1bg=
+github.com/opencord/voltha-lib-go/v3 v3.1.1/go.mod h1:ad7C/5/09RcYvGQrxUH4AuOiO8OSQqGmCgEJNEpaag8=
+github.com/opencord/voltha-protos/v3 v3.3.0 h1:1Q1C6nWSkjaJY87GQgc7hWU6kqjkWdM+rzqSXBKb0cQ=
+github.com/opencord/voltha-protos/v3 v3.3.0/go.mod h1:nl1ETp5Iw3avxOaKD8BJlYY5wYI4KeV95aT1pL63nto=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
@@ -241,6 +247,8 @@
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/soheilhy/cmux v0.1.4 h1:0HKaf1o97UwFjHH9o5XsHUOF+tqmdA7KEzXLpiyaw0E=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
+github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
+github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
github.com/spf13/pflag v1.0.1/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go
index 49c52e1..06e5322 100644
--- a/internal/pkg/config/config.go
+++ b/internal/pkg/config/config.go
@@ -53,6 +53,8 @@
defaultHearbeatFailReportInterval = 0 * time.Second
//defaultGrpcTimeoutInterval is the time in seconds a grpc call will wait before returning error.
defaultGrpcTimeoutInterval = 2 * time.Second
+ defaultCurrentReplica = 1
+ defaultTotalReplicas = 1
)
// AdapterFlags represents the set of configurations used by the read-write adaptercore service
@@ -81,6 +83,8 @@
HeartbeatCheckInterval time.Duration
HeartbeatFailReportInterval time.Duration
GrpcTimeoutInterval time.Duration
+ CurrentReplica int
+ TotalReplicas int
}
// NewAdapterFlags returns a new RWCore config
@@ -182,6 +186,12 @@
help = fmt.Sprintf("Number of seconds for GRPC timeout.")
flag.DurationVar(&(so.GrpcTimeoutInterval), "grpc_timeout_interval", defaultGrpcTimeoutInterval, help)
+ help = "Replica number of this particular instance (default: %s)"
+ flag.IntVar(&(so.CurrentReplica), "current_replica", defaultCurrentReplica, help)
+
+ help = "Total number of instances for this adapter"
+ flag.IntVar(&(so.TotalReplicas), "total_replica", defaultTotalReplicas, help)
+
flag.Parse()
containerName := getContainerInfo()
if len(containerName) > 0 {
diff --git a/vendor/github.com/buraksezer/consistent/.gitignore b/vendor/github.com/buraksezer/consistent/.gitignore
new file mode 100644
index 0000000..a1338d6
--- /dev/null
+++ b/vendor/github.com/buraksezer/consistent/.gitignore
@@ -0,0 +1,14 @@
+# Binaries for programs and plugins
+*.exe
+*.dll
+*.so
+*.dylib
+
+# Test binary, build with `go test -c`
+*.test
+
+# Output of the go coverage tool, specifically when used with LiteIDE
+*.out
+
+# Project-local glide cache, RE: https://github.com/Masterminds/glide/issues/736
+.glide/
diff --git a/vendor/github.com/buraksezer/consistent/.travis.yml b/vendor/github.com/buraksezer/consistent/.travis.yml
new file mode 100644
index 0000000..4f2ee4d
--- /dev/null
+++ b/vendor/github.com/buraksezer/consistent/.travis.yml
@@ -0,0 +1 @@
+language: go
diff --git a/vendor/github.com/buraksezer/consistent/LICENSE b/vendor/github.com/buraksezer/consistent/LICENSE
new file mode 100644
index 0000000..e470334
--- /dev/null
+++ b/vendor/github.com/buraksezer/consistent/LICENSE
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2018 Burak Sezer
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/vendor/github.com/buraksezer/consistent/README.md b/vendor/github.com/buraksezer/consistent/README.md
new file mode 100644
index 0000000..bde53d1
--- /dev/null
+++ b/vendor/github.com/buraksezer/consistent/README.md
@@ -0,0 +1,235 @@
+consistent
+==========
+[![GoDoc](http://img.shields.io/badge/godoc-reference-blue.svg?style=flat)](https://godoc.org/github.com/buraksezer/consistent) [![Build Status](https://travis-ci.org/buraksezer/consistent.svg?branch=master)](https://travis-ci.org/buraksezer/consistent) [![Coverage](http://gocover.io/_badge/github.com/buraksezer/consistent)](http://gocover.io/github.com/buraksezer/consistent) [![Go Report Card](https://goreportcard.com/badge/github.com/buraksezer/consistent)](https://goreportcard.com/report/github.com/buraksezer/consistent) [![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT) [![Mentioned in Awesome Go](https://awesome.re/mentioned-badge.svg)](https://github.com/avelino/awesome-go)
+
+
+This library provides a consistent hashing function which simultaneously achieves both uniformity and consistency.
+
+For detailed information about the concept, you should take a look at the following resources:
+
+* [Consistent Hashing with Bounded Loads on Google Research Blog](https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html)
+* [Improving load balancing with a new consistent-hashing algorithm on Vimeo Engineering Blog](https://medium.com/vimeo-engineering-blog/improving-load-balancing-with-a-new-consistent-hashing-algorithm-9f1bd75709ed)
+* [Consistent Hashing with Bounded Loads paper on arXiv](https://arxiv.org/abs/1608.01350)
+
+Table of Content
+----------------
+
+- [Overview](#overview)
+- [Install](#install)
+- [Configuration](#configuration)
+- [Usage](#usage)
+- [Benchmarks](#benchmarks)
+- [Examples](#examples)
+
+Overview
+--------
+
+In this package's context, the keys are distributed among partitions and partitions are distributed among members as well.
+
+When you create a new consistent instance or call `Add/Remove`:
+
+* The member's name is hashed and inserted into the hash ring,
+* Average load is calculated by the algorithm defined in the paper,
+* Partitions are distributed among members by hashing partition IDs and none of them exceed the average load.
+
+Average load cannot be exceeded. So if all members are loaded at the maximum while trying to add a new member, it panics.
+
+When you want to locate a key by calling `LocateKey`:
+
+* The key(byte slice) is hashed,
+* The result of the hash is mod by the number of partitions,
+* The result of this modulo - `MOD(hash result, partition count)` - is the partition in which the key will be located,
+* Owner of the partition is already determined before calling `LocateKey`. So it returns the partition owner immediately.
+
+No memory is allocated by `consistent` except hashing when you want to locate a key.
+
+Note that the number of partitions cannot be changed after creation.
+
+Install
+-------
+
+With a correctly configured Go environment:
+
+```
+go get github.com/buraksezer/consistent
+```
+
+You will find some useful usage samples in [examples](https://github.com/buraksezer/consistent/tree/master/_examples) folder.
+
+Configuration
+-------------
+
+```go
+type Config struct {
+ // Hasher is responsible for generating unsigned, 64 bit hash of provided byte slice.
+ Hasher Hasher
+
+ // Keys are distributed among partitions. Prime numbers are good to
+ // distribute keys uniformly. Select a big PartitionCount if you have
+ // too many keys.
+ PartitionCount int
+
+ // Members are replicated on consistent hash ring. This number controls
+ // the number each member is replicated on the ring.
+ ReplicationFactor int
+
+ // Load is used to calculate average load. See the code, the paper and Google's
+ // blog post to learn about it.
+ Load float64
+}
+```
+
+Any hash algorithm can be used as hasher which implements Hasher interface. Please take a look at the *Sample* section for an example.
+
+Usage
+-----
+
+`LocateKey` function finds a member in the cluster for your key:
+```go
+// With a properly configured and initialized consistent instance
+key := []byte("my-key")
+member := c.LocateKey(key)
+```
+It returns a thread-safe copy of the member you added before.
+
+The second most frequently used function is `GetClosestN`.
+
+```go
+// With a properly configured and initialized consistent instance
+
+key := []byte("my-key")
+members, err := c.GetClosestN(key, 2)
+```
+
+This may be useful to find backup nodes to store your key.
+
+Benchmarks
+----------
+On an early 2015 Macbook:
+
+```
+BenchmarkAddRemove-4 100000 22006 ns/op
+BenchmarkLocateKey-4 5000000 252 ns/op
+BenchmarkGetClosestN-4 500000 2974 ns/op
+```
+
+Examples
+--------
+
+The most basic use of consistent package should be like this. For detailed list of functions, [visit godoc.org.](https://godoc.org/github.com/buraksezer/consistent)
+More sample code can be found under [_examples](https://github.com/buraksezer/consistent/tree/master/_examples).
+
+```go
+package main
+
+import (
+ "fmt"
+
+ "github.com/buraksezer/consistent"
+ "github.com/cespare/xxhash"
+)
+
+// In your code, you probably have a custom data type
+// for your cluster members. Just add a String function to implement
+// consistent.Member interface.
+type myMember string
+
+func (m myMember) String() string {
+ return string(m)
+}
+
+// consistent package doesn't provide a default hashing function.
+// You should provide a proper one to distribute keys/members uniformly.
+type hasher struct{}
+
+func (h hasher) Sum64(data []byte) uint64 {
+ // you should use a proper hash function for uniformity.
+ return xxhash.Sum64(data)
+}
+
+func main() {
+ // Create a new consistent instance
+ cfg := consistent.Config{
+ PartitionCount: 7,
+ ReplicationFactor: 20,
+ Load: 1.25,
+ Hasher: hasher{},
+ }
+ c := consistent.New(nil, cfg)
+
+ // Add some members to the consistent hash table.
+ // Add function calculates average load and distributes partitions over members
+ node1 := myMember("node1.olric.com")
+ c.Add(node1)
+
+ node2 := myMember("node2.olric.com")
+ c.Add(node2)
+
+ key := []byte("my-key")
+ // calculates partition id for the given key
+ // partID := hash(key) % partitionCount
+ // the partitions are already distributed among members by Add function.
+ owner := c.LocateKey(key)
+ fmt.Println(owner.String())
+ // Prints node2.olric.com
+}
+```
+
+Another useful example is `_examples/relocation_percentage.go`. It creates a `consistent` object with 8 members and distributes partitions among them. Then adds 9th member,
+here is the result with a proper configuration and hash function:
+
+```
+bloom:consistent burak$ go run _examples/relocation_percentage.go
+partID: 218 moved to node2.olric from node0.olric
+partID: 173 moved to node9.olric from node3.olric
+partID: 225 moved to node7.olric from node0.olric
+partID: 85 moved to node9.olric from node7.olric
+partID: 220 moved to node5.olric from node0.olric
+partID: 33 moved to node9.olric from node5.olric
+partID: 254 moved to node9.olric from node4.olric
+partID: 71 moved to node9.olric from node3.olric
+partID: 236 moved to node9.olric from node2.olric
+partID: 118 moved to node9.olric from node3.olric
+partID: 233 moved to node3.olric from node0.olric
+partID: 50 moved to node9.olric from node4.olric
+partID: 252 moved to node9.olric from node2.olric
+partID: 121 moved to node9.olric from node2.olric
+partID: 259 moved to node9.olric from node4.olric
+partID: 92 moved to node9.olric from node7.olric
+partID: 152 moved to node9.olric from node3.olric
+partID: 105 moved to node9.olric from node2.olric
+
+6% of the partitions are relocated
+```
+
+Moved partition count is highly dependent on your configuration and quailty of hash function. You should modify the configuration to find an optimum set of configurations
+for your system.
+
+`_examples/load_distribution.go` is also useful to understand load distribution. It creates a `consistent` object with 8 members and locates 1M key. It also calculates average
+load which cannot be exceeded by any member. Here is the result:
+
+```
+Maximum key count for a member should be around this: 147602
+member: node2.olric, key count: 100362
+member: node5.olric, key count: 99448
+member: node0.olric, key count: 147735
+member: node3.olric, key count: 103455
+member: node6.olric, key count: 147069
+member: node1.olric, key count: 121566
+member: node4.olric, key count: 147932
+member: node7.olric, key count: 132433
+```
+
+Average load can be calculated by using the following formula:
+
+```
+load := (consistent.AverageLoad() * float64(keyCount)) / float64(config.PartitionCount)
+```
+
+Contributions
+-------------
+Please don't hesitate to fork the project and send a pull request or just e-mail me to ask questions and share ideas.
+
+License
+-------
+MIT License, - see LICENSE for more details.
diff --git a/vendor/github.com/buraksezer/consistent/consistent.go b/vendor/github.com/buraksezer/consistent/consistent.go
new file mode 100644
index 0000000..a1446d6
--- /dev/null
+++ b/vendor/github.com/buraksezer/consistent/consistent.go
@@ -0,0 +1,362 @@
+// Copyright (c) 2018 Burak Sezer
+// All rights reserved.
+//
+// This code is licensed under the MIT License.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files(the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions :
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+// Package consistent provides a consistent hashing function with bounded loads.
+// For more information about the underlying algorithm, please take a look at
+// https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html
+//
+// Example Use:
+// cfg := consistent.Config{
+// PartitionCount: 71,
+// ReplicationFactor: 20,
+// Load: 1.25,
+// Hasher: hasher{},
+// }
+//
+// // Create a new consistent object
+// // You may call this with a list of members
+// // instead of adding them one by one.
+// c := consistent.New(members, cfg)
+//
+// // myMember struct just needs to implement a String method.
+// // New/Add/Remove distributes partitions among members using the algorithm
+// // defined on Google Research Blog.
+// c.Add(myMember)
+//
+// key := []byte("my-key")
+// // LocateKey hashes the key and calculates partition ID with
+// // this modulo operation: MOD(hash result, partition count)
+// // The owner of the partition is already calculated by New/Add/Remove.
+// // LocateKey just returns the member which's responsible for the key.
+// member := c.LocateKey(key)
+//
+package consistent
+
+import (
+ "encoding/binary"
+ "errors"
+ "fmt"
+ "math"
+ "sort"
+ "sync"
+)
+
+var (
+ //ErrInsufficientMemberCount represents an error which means there are not enough members to complete the task.
+ ErrInsufficientMemberCount = errors.New("insufficient member count")
+
+ // ErrMemberNotFound represents an error which means requested member could not be found in consistent hash ring.
+ ErrMemberNotFound = errors.New("member could not be found in ring")
+)
+
+// Hasher is responsible for generating unsigned, 64 bit hash of provided byte slice.
+// Hasher should minimize collisions (generating same hash for different byte slice)
+// and while performance is also important fast functions are preferable (i.e.
+// you can use FarmHash family).
+type Hasher interface {
+ Sum64([]byte) uint64
+}
+
+// Member interface represents a member in consistent hash ring.
+type Member interface {
+ String() string
+}
+
+// Config represents a structure to control consistent package.
+type Config struct {
+ // Hasher is responsible for generating unsigned, 64 bit hash of provided byte slice.
+ Hasher Hasher
+
+ // Keys are distributed among partitions. Prime numbers are good to
+ // distribute keys uniformly. Select a big PartitionCount if you have
+ // too many keys.
+ PartitionCount int
+
+ // Members are replicated on consistent hash ring. This number means that a member
+ // how many times replicated on the ring.
+ ReplicationFactor int
+
+ // Load is used to calculate average load. See the code, the paper and Google's blog post to learn about it.
+ Load float64
+}
+
+// Consistent holds the information about the members of the consistent hash circle.
+type Consistent struct {
+ mu sync.RWMutex
+
+ config Config
+ hasher Hasher
+ sortedSet []uint64
+ partitionCount uint64
+ loads map[string]float64
+ members map[string]*Member
+ partitions map[int]*Member
+ ring map[uint64]*Member
+}
+
+// New creates and returns a new Consistent object.
+func New(members []Member, config Config) *Consistent {
+ c := &Consistent{
+ config: config,
+ members: make(map[string]*Member),
+ partitionCount: uint64(config.PartitionCount),
+ ring: make(map[uint64]*Member),
+ }
+ if config.Hasher == nil {
+ panic("Hasher cannot be nil")
+ }
+ // TODO: Check configuration here
+ c.hasher = config.Hasher
+ for _, member := range members {
+ c.add(member)
+ }
+ if members != nil {
+ c.distributePartitions()
+ }
+ return c
+}
+
+// GetMembers returns a thread-safe copy of members.
+func (c *Consistent) GetMembers() []Member {
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+
+ // Create a thread-safe copy of member list.
+ members := make([]Member, 0, len(c.members))
+ for _, member := range c.members {
+ members = append(members, *member)
+ }
+ return members
+}
+
+// AverageLoad exposes the current average load.
+func (c *Consistent) AverageLoad() float64 {
+ avgLoad := float64(c.partitionCount/uint64(len(c.members))) * c.config.Load
+ return math.Ceil(avgLoad)
+}
+
+func (c *Consistent) distributeWithLoad(partID, idx int, partitions map[int]*Member, loads map[string]float64) {
+ avgLoad := c.AverageLoad()
+ var count int
+ for {
+ count++
+ if count >= len(c.sortedSet) {
+ // User needs to decrease partition count, increase member count or increase load factor.
+ panic("not enough room to distribute partitions")
+ }
+ i := c.sortedSet[idx]
+ member := *c.ring[i]
+ load := loads[member.String()]
+ if load+1 <= avgLoad {
+ partitions[partID] = &member
+ loads[member.String()]++
+ return
+ }
+ idx++
+ if idx >= len(c.sortedSet) {
+ idx = 0
+ }
+ }
+}
+
+func (c *Consistent) distributePartitions() {
+ loads := make(map[string]float64)
+ partitions := make(map[int]*Member)
+
+ bs := make([]byte, 8)
+ for partID := uint64(0); partID < c.partitionCount; partID++ {
+ binary.LittleEndian.PutUint64(bs, partID)
+ key := c.hasher.Sum64(bs)
+ idx := sort.Search(len(c.sortedSet), func(i int) bool {
+ return c.sortedSet[i] >= key
+ })
+ if idx >= len(c.sortedSet) {
+ idx = 0
+ }
+ c.distributeWithLoad(int(partID), idx, partitions, loads)
+ }
+ c.partitions = partitions
+ c.loads = loads
+}
+
+func (c *Consistent) add(member Member) {
+ for i := 0; i < c.config.ReplicationFactor; i++ {
+ key := []byte(fmt.Sprintf("%s%d", member.String(), i))
+ h := c.hasher.Sum64(key)
+ c.ring[h] = &member
+ c.sortedSet = append(c.sortedSet, h)
+ }
+ // sort hashes ascendingly
+ sort.Slice(c.sortedSet, func(i int, j int) bool {
+ return c.sortedSet[i] < c.sortedSet[j]
+ })
+ // Storing member at this map is useful to find backup members of a partition.
+ c.members[member.String()] = &member
+}
+
+// Add adds a new member to the consistent hash circle.
+func (c *Consistent) Add(member Member) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ if _, ok := c.members[member.String()]; ok {
+ // We already have this member. Quit immediately.
+ return
+ }
+ c.add(member)
+ c.distributePartitions()
+}
+
+func (c *Consistent) delSlice(val uint64) {
+ for i := 0; i < len(c.sortedSet); i++ {
+ if c.sortedSet[i] == val {
+ c.sortedSet = append(c.sortedSet[:i], c.sortedSet[i+1:]...)
+ break
+ }
+ }
+}
+
+// Remove removes a member from the consistent hash circle.
+func (c *Consistent) Remove(name string) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ if _, ok := c.members[name]; !ok {
+ // There is no member with that name. Quit immediately.
+ return
+ }
+
+ for i := 0; i < c.config.ReplicationFactor; i++ {
+ key := []byte(fmt.Sprintf("%s%d", name, i))
+ h := c.hasher.Sum64(key)
+ delete(c.ring, h)
+ c.delSlice(h)
+ }
+ delete(c.members, name)
+ if len(c.members) == 0 {
+ // consistent hash ring is empty now. Reset the partition table.
+ c.partitions = make(map[int]*Member)
+ return
+ }
+ c.distributePartitions()
+}
+
+// LoadDistribution exposes load distribution of members.
+func (c *Consistent) LoadDistribution() map[string]float64 {
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+
+ // Create a thread-safe copy
+ res := make(map[string]float64)
+ for member, load := range c.loads {
+ res[member] = load
+ }
+ return res
+}
+
+// FindPartitionID returns partition id for given key.
+func (c *Consistent) FindPartitionID(key []byte) int {
+ hkey := c.hasher.Sum64(key)
+ return int(hkey % c.partitionCount)
+}
+
+// GetPartitionOwner returns the owner of the given partition.
+func (c *Consistent) GetPartitionOwner(partID int) Member {
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+
+ member, ok := c.partitions[partID]
+ if !ok {
+ return nil
+ }
+ // Create a thread-safe copy of member and return it.
+ return *member
+}
+
+// LocateKey finds a home for given key
+func (c *Consistent) LocateKey(key []byte) Member {
+ partID := c.FindPartitionID(key)
+ return c.GetPartitionOwner(partID)
+}
+
+func (c *Consistent) getClosestN(partID, count int) ([]Member, error) {
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+
+ res := []Member{}
+ if count > len(c.members) {
+ return res, ErrInsufficientMemberCount
+ }
+
+ var ownerKey uint64
+ owner := c.GetPartitionOwner(partID)
+ // Hash and sort all the names.
+ keys := []uint64{}
+ kmems := make(map[uint64]*Member)
+ for name, member := range c.members {
+ key := c.hasher.Sum64([]byte(name))
+ if name == owner.String() {
+ ownerKey = key
+ }
+ keys = append(keys, key)
+ kmems[key] = member
+ }
+ sort.Slice(keys, func(i, j int) bool {
+ return keys[i] < keys[j]
+ })
+
+ // Find the key owner
+ idx := 0
+ for idx < len(keys) {
+ if keys[idx] == ownerKey {
+ key := keys[idx]
+ res = append(res, *kmems[key])
+ break
+ }
+ idx++
+ }
+
+ // Find the closest(replica owners) members.
+ for len(res) < count {
+ idx++
+ if idx >= len(keys) {
+ idx = 0
+ }
+ key := keys[idx]
+ res = append(res, *kmems[key])
+ }
+ return res, nil
+}
+
+// GetClosestN returns the closest N member to a key in the hash ring.
+// This may be useful to find members for replication.
+func (c *Consistent) GetClosestN(key []byte, count int) ([]Member, error) {
+ partID := c.FindPartitionID(key)
+ return c.getClosestN(partID, count)
+}
+
+// GetClosestNForPartition returns the closest N member for given partition.
+// This may be useful to find members for replication.
+func (c *Consistent) GetClosestNForPartition(partID, count int) ([]Member, error) {
+ return c.getClosestN(partID, count)
+}
diff --git a/vendor/github.com/cespare/xxhash/LICENSE.txt b/vendor/github.com/cespare/xxhash/LICENSE.txt
new file mode 100644
index 0000000..24b5306
--- /dev/null
+++ b/vendor/github.com/cespare/xxhash/LICENSE.txt
@@ -0,0 +1,22 @@
+Copyright (c) 2016 Caleb Spare
+
+MIT License
+
+Permission is hereby granted, free of charge, to any person obtaining
+a copy of this software and associated documentation files (the
+"Software"), to deal in the Software without restriction, including
+without limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
diff --git a/vendor/github.com/cespare/xxhash/README.md b/vendor/github.com/cespare/xxhash/README.md
new file mode 100644
index 0000000..0982fd2
--- /dev/null
+++ b/vendor/github.com/cespare/xxhash/README.md
@@ -0,0 +1,50 @@
+# xxhash
+
+[![GoDoc](https://godoc.org/github.com/cespare/xxhash?status.svg)](https://godoc.org/github.com/cespare/xxhash)
+
+xxhash is a Go implementation of the 64-bit
+[xxHash](http://cyan4973.github.io/xxHash/) algorithm, XXH64. This is a
+high-quality hashing algorithm that is much faster than anything in the Go
+standard library.
+
+The API is very small, taking its cue from the other hashing packages in the
+standard library:
+
+ $ go doc github.com/cespare/xxhash !
+ package xxhash // import "github.com/cespare/xxhash"
+
+ Package xxhash implements the 64-bit variant of xxHash (XXH64) as described
+ at http://cyan4973.github.io/xxHash/.
+
+ func New() hash.Hash64
+ func Sum64(b []byte) uint64
+ func Sum64String(s string) uint64
+
+This implementation provides a fast pure-Go implementation and an even faster
+assembly implementation for amd64.
+
+## Benchmarks
+
+Here are some quick benchmarks comparing the pure-Go and assembly
+implementations of Sum64 against another popular Go XXH64 implementation,
+[github.com/OneOfOne/xxhash](https://github.com/OneOfOne/xxhash):
+
+| input size | OneOfOne | cespare (purego) | cespare |
+| --- | --- | --- | --- |
+| 5 B | 416 MB/s | 720 MB/s | 872 MB/s |
+| 100 B | 3980 MB/s | 5013 MB/s | 5252 MB/s |
+| 4 KB | 12727 MB/s | 12999 MB/s | 13026 MB/s |
+| 10 MB | 9879 MB/s | 10775 MB/s | 10913 MB/s |
+
+These numbers were generated with:
+
+```
+$ go test -benchtime 10s -bench '/OneOfOne,'
+$ go test -tags purego -benchtime 10s -bench '/xxhash,'
+$ go test -benchtime 10s -bench '/xxhash,'
+```
+
+## Projects using this package
+
+- [InfluxDB](https://github.com/influxdata/influxdb)
+- [Prometheus](https://github.com/prometheus/prometheus)
diff --git a/vendor/github.com/cespare/xxhash/go.mod b/vendor/github.com/cespare/xxhash/go.mod
new file mode 100644
index 0000000..10605a6
--- /dev/null
+++ b/vendor/github.com/cespare/xxhash/go.mod
@@ -0,0 +1,6 @@
+module github.com/cespare/xxhash
+
+require (
+ github.com/OneOfOne/xxhash v1.2.2
+ github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72
+)
diff --git a/vendor/github.com/cespare/xxhash/go.sum b/vendor/github.com/cespare/xxhash/go.sum
new file mode 100644
index 0000000..f6b5542
--- /dev/null
+++ b/vendor/github.com/cespare/xxhash/go.sum
@@ -0,0 +1,4 @@
+github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
+github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
+github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
+github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
diff --git a/vendor/github.com/cespare/xxhash/rotate.go b/vendor/github.com/cespare/xxhash/rotate.go
new file mode 100644
index 0000000..f3eac5e
--- /dev/null
+++ b/vendor/github.com/cespare/xxhash/rotate.go
@@ -0,0 +1,14 @@
+// +build !go1.9
+
+package xxhash
+
+// TODO(caleb): After Go 1.10 comes out, remove this fallback code.
+
+func rol1(x uint64) uint64 { return (x << 1) | (x >> (64 - 1)) }
+func rol7(x uint64) uint64 { return (x << 7) | (x >> (64 - 7)) }
+func rol11(x uint64) uint64 { return (x << 11) | (x >> (64 - 11)) }
+func rol12(x uint64) uint64 { return (x << 12) | (x >> (64 - 12)) }
+func rol18(x uint64) uint64 { return (x << 18) | (x >> (64 - 18)) }
+func rol23(x uint64) uint64 { return (x << 23) | (x >> (64 - 23)) }
+func rol27(x uint64) uint64 { return (x << 27) | (x >> (64 - 27)) }
+func rol31(x uint64) uint64 { return (x << 31) | (x >> (64 - 31)) }
diff --git a/vendor/github.com/cespare/xxhash/rotate19.go b/vendor/github.com/cespare/xxhash/rotate19.go
new file mode 100644
index 0000000..b99612b
--- /dev/null
+++ b/vendor/github.com/cespare/xxhash/rotate19.go
@@ -0,0 +1,14 @@
+// +build go1.9
+
+package xxhash
+
+import "math/bits"
+
+func rol1(x uint64) uint64 { return bits.RotateLeft64(x, 1) }
+func rol7(x uint64) uint64 { return bits.RotateLeft64(x, 7) }
+func rol11(x uint64) uint64 { return bits.RotateLeft64(x, 11) }
+func rol12(x uint64) uint64 { return bits.RotateLeft64(x, 12) }
+func rol18(x uint64) uint64 { return bits.RotateLeft64(x, 18) }
+func rol23(x uint64) uint64 { return bits.RotateLeft64(x, 23) }
+func rol27(x uint64) uint64 { return bits.RotateLeft64(x, 27) }
+func rol31(x uint64) uint64 { return bits.RotateLeft64(x, 31) }
diff --git a/vendor/github.com/cespare/xxhash/xxhash.go b/vendor/github.com/cespare/xxhash/xxhash.go
new file mode 100644
index 0000000..f896bd2
--- /dev/null
+++ b/vendor/github.com/cespare/xxhash/xxhash.go
@@ -0,0 +1,168 @@
+// Package xxhash implements the 64-bit variant of xxHash (XXH64) as described
+// at http://cyan4973.github.io/xxHash/.
+package xxhash
+
+import (
+ "encoding/binary"
+ "hash"
+)
+
+const (
+ prime1 uint64 = 11400714785074694791
+ prime2 uint64 = 14029467366897019727
+ prime3 uint64 = 1609587929392839161
+ prime4 uint64 = 9650029242287828579
+ prime5 uint64 = 2870177450012600261
+)
+
+// NOTE(caleb): I'm using both consts and vars of the primes. Using consts where
+// possible in the Go code is worth a small (but measurable) performance boost
+// by avoiding some MOVQs. Vars are needed for the asm and also are useful for
+// convenience in the Go code in a few places where we need to intentionally
+// avoid constant arithmetic (e.g., v1 := prime1 + prime2 fails because the
+// result overflows a uint64).
+var (
+ prime1v = prime1
+ prime2v = prime2
+ prime3v = prime3
+ prime4v = prime4
+ prime5v = prime5
+)
+
+type xxh struct {
+ v1 uint64
+ v2 uint64
+ v3 uint64
+ v4 uint64
+ total int
+ mem [32]byte
+ n int // how much of mem is used
+}
+
+// New creates a new hash.Hash64 that implements the 64-bit xxHash algorithm.
+func New() hash.Hash64 {
+ var x xxh
+ x.Reset()
+ return &x
+}
+
+func (x *xxh) Reset() {
+ x.n = 0
+ x.total = 0
+ x.v1 = prime1v + prime2
+ x.v2 = prime2
+ x.v3 = 0
+ x.v4 = -prime1v
+}
+
+func (x *xxh) Size() int { return 8 }
+func (x *xxh) BlockSize() int { return 32 }
+
+// Write adds more data to x. It always returns len(b), nil.
+func (x *xxh) Write(b []byte) (n int, err error) {
+ n = len(b)
+ x.total += len(b)
+
+ if x.n+len(b) < 32 {
+ // This new data doesn't even fill the current block.
+ copy(x.mem[x.n:], b)
+ x.n += len(b)
+ return
+ }
+
+ if x.n > 0 {
+ // Finish off the partial block.
+ copy(x.mem[x.n:], b)
+ x.v1 = round(x.v1, u64(x.mem[0:8]))
+ x.v2 = round(x.v2, u64(x.mem[8:16]))
+ x.v3 = round(x.v3, u64(x.mem[16:24]))
+ x.v4 = round(x.v4, u64(x.mem[24:32]))
+ b = b[32-x.n:]
+ x.n = 0
+ }
+
+ if len(b) >= 32 {
+ // One or more full blocks left.
+ b = writeBlocks(x, b)
+ }
+
+ // Store any remaining partial block.
+ copy(x.mem[:], b)
+ x.n = len(b)
+
+ return
+}
+
+func (x *xxh) Sum(b []byte) []byte {
+ s := x.Sum64()
+ return append(
+ b,
+ byte(s>>56),
+ byte(s>>48),
+ byte(s>>40),
+ byte(s>>32),
+ byte(s>>24),
+ byte(s>>16),
+ byte(s>>8),
+ byte(s),
+ )
+}
+
+func (x *xxh) Sum64() uint64 {
+ var h uint64
+
+ if x.total >= 32 {
+ v1, v2, v3, v4 := x.v1, x.v2, x.v3, x.v4
+ h = rol1(v1) + rol7(v2) + rol12(v3) + rol18(v4)
+ h = mergeRound(h, v1)
+ h = mergeRound(h, v2)
+ h = mergeRound(h, v3)
+ h = mergeRound(h, v4)
+ } else {
+ h = x.v3 + prime5
+ }
+
+ h += uint64(x.total)
+
+ i, end := 0, x.n
+ for ; i+8 <= end; i += 8 {
+ k1 := round(0, u64(x.mem[i:i+8]))
+ h ^= k1
+ h = rol27(h)*prime1 + prime4
+ }
+ if i+4 <= end {
+ h ^= uint64(u32(x.mem[i:i+4])) * prime1
+ h = rol23(h)*prime2 + prime3
+ i += 4
+ }
+ for i < end {
+ h ^= uint64(x.mem[i]) * prime5
+ h = rol11(h) * prime1
+ i++
+ }
+
+ h ^= h >> 33
+ h *= prime2
+ h ^= h >> 29
+ h *= prime3
+ h ^= h >> 32
+
+ return h
+}
+
+func u64(b []byte) uint64 { return binary.LittleEndian.Uint64(b) }
+func u32(b []byte) uint32 { return binary.LittleEndian.Uint32(b) }
+
+func round(acc, input uint64) uint64 {
+ acc += input * prime2
+ acc = rol31(acc)
+ acc *= prime1
+ return acc
+}
+
+func mergeRound(acc, val uint64) uint64 {
+ val = round(0, val)
+ acc ^= val
+ acc = acc*prime1 + prime4
+ return acc
+}
diff --git a/vendor/github.com/cespare/xxhash/xxhash_amd64.go b/vendor/github.com/cespare/xxhash/xxhash_amd64.go
new file mode 100644
index 0000000..d617652
--- /dev/null
+++ b/vendor/github.com/cespare/xxhash/xxhash_amd64.go
@@ -0,0 +1,12 @@
+// +build !appengine
+// +build gc
+// +build !purego
+
+package xxhash
+
+// Sum64 computes the 64-bit xxHash digest of b.
+//
+//go:noescape
+func Sum64(b []byte) uint64
+
+func writeBlocks(x *xxh, b []byte) []byte
diff --git a/vendor/github.com/cespare/xxhash/xxhash_amd64.s b/vendor/github.com/cespare/xxhash/xxhash_amd64.s
new file mode 100644
index 0000000..757f201
--- /dev/null
+++ b/vendor/github.com/cespare/xxhash/xxhash_amd64.s
@@ -0,0 +1,233 @@
+// +build !appengine
+// +build gc
+// +build !purego
+
+#include "textflag.h"
+
+// Register allocation:
+// AX h
+// CX pointer to advance through b
+// DX n
+// BX loop end
+// R8 v1, k1
+// R9 v2
+// R10 v3
+// R11 v4
+// R12 tmp
+// R13 prime1v
+// R14 prime2v
+// R15 prime4v
+
+// round reads from and advances the buffer pointer in CX.
+// It assumes that R13 has prime1v and R14 has prime2v.
+#define round(r) \
+ MOVQ (CX), R12 \
+ ADDQ $8, CX \
+ IMULQ R14, R12 \
+ ADDQ R12, r \
+ ROLQ $31, r \
+ IMULQ R13, r
+
+// mergeRound applies a merge round on the two registers acc and val.
+// It assumes that R13 has prime1v, R14 has prime2v, and R15 has prime4v.
+#define mergeRound(acc, val) \
+ IMULQ R14, val \
+ ROLQ $31, val \
+ IMULQ R13, val \
+ XORQ val, acc \
+ IMULQ R13, acc \
+ ADDQ R15, acc
+
+// func Sum64(b []byte) uint64
+TEXT ·Sum64(SB), NOSPLIT, $0-32
+ // Load fixed primes.
+ MOVQ ·prime1v(SB), R13
+ MOVQ ·prime2v(SB), R14
+ MOVQ ·prime4v(SB), R15
+
+ // Load slice.
+ MOVQ b_base+0(FP), CX
+ MOVQ b_len+8(FP), DX
+ LEAQ (CX)(DX*1), BX
+
+ // The first loop limit will be len(b)-32.
+ SUBQ $32, BX
+
+ // Check whether we have at least one block.
+ CMPQ DX, $32
+ JLT noBlocks
+
+ // Set up initial state (v1, v2, v3, v4).
+ MOVQ R13, R8
+ ADDQ R14, R8
+ MOVQ R14, R9
+ XORQ R10, R10
+ XORQ R11, R11
+ SUBQ R13, R11
+
+ // Loop until CX > BX.
+blockLoop:
+ round(R8)
+ round(R9)
+ round(R10)
+ round(R11)
+
+ CMPQ CX, BX
+ JLE blockLoop
+
+ MOVQ R8, AX
+ ROLQ $1, AX
+ MOVQ R9, R12
+ ROLQ $7, R12
+ ADDQ R12, AX
+ MOVQ R10, R12
+ ROLQ $12, R12
+ ADDQ R12, AX
+ MOVQ R11, R12
+ ROLQ $18, R12
+ ADDQ R12, AX
+
+ mergeRound(AX, R8)
+ mergeRound(AX, R9)
+ mergeRound(AX, R10)
+ mergeRound(AX, R11)
+
+ JMP afterBlocks
+
+noBlocks:
+ MOVQ ·prime5v(SB), AX
+
+afterBlocks:
+ ADDQ DX, AX
+
+ // Right now BX has len(b)-32, and we want to loop until CX > len(b)-8.
+ ADDQ $24, BX
+
+ CMPQ CX, BX
+ JG fourByte
+
+wordLoop:
+ // Calculate k1.
+ MOVQ (CX), R8
+ ADDQ $8, CX
+ IMULQ R14, R8
+ ROLQ $31, R8
+ IMULQ R13, R8
+
+ XORQ R8, AX
+ ROLQ $27, AX
+ IMULQ R13, AX
+ ADDQ R15, AX
+
+ CMPQ CX, BX
+ JLE wordLoop
+
+fourByte:
+ ADDQ $4, BX
+ CMPQ CX, BX
+ JG singles
+
+ MOVL (CX), R8
+ ADDQ $4, CX
+ IMULQ R13, R8
+ XORQ R8, AX
+
+ ROLQ $23, AX
+ IMULQ R14, AX
+ ADDQ ·prime3v(SB), AX
+
+singles:
+ ADDQ $4, BX
+ CMPQ CX, BX
+ JGE finalize
+
+singlesLoop:
+ MOVBQZX (CX), R12
+ ADDQ $1, CX
+ IMULQ ·prime5v(SB), R12
+ XORQ R12, AX
+
+ ROLQ $11, AX
+ IMULQ R13, AX
+
+ CMPQ CX, BX
+ JL singlesLoop
+
+finalize:
+ MOVQ AX, R12
+ SHRQ $33, R12
+ XORQ R12, AX
+ IMULQ R14, AX
+ MOVQ AX, R12
+ SHRQ $29, R12
+ XORQ R12, AX
+ IMULQ ·prime3v(SB), AX
+ MOVQ AX, R12
+ SHRQ $32, R12
+ XORQ R12, AX
+
+ MOVQ AX, ret+24(FP)
+ RET
+
+// writeBlocks uses the same registers as above except that it uses AX to store
+// the x pointer.
+
+// func writeBlocks(x *xxh, b []byte) []byte
+TEXT ·writeBlocks(SB), NOSPLIT, $0-56
+ // Load fixed primes needed for round.
+ MOVQ ·prime1v(SB), R13
+ MOVQ ·prime2v(SB), R14
+
+ // Load slice.
+ MOVQ b_base+8(FP), CX
+ MOVQ CX, ret_base+32(FP) // initialize return base pointer; see NOTE below
+ MOVQ b_len+16(FP), DX
+ LEAQ (CX)(DX*1), BX
+ SUBQ $32, BX
+
+ // Load vN from x.
+ MOVQ x+0(FP), AX
+ MOVQ 0(AX), R8 // v1
+ MOVQ 8(AX), R9 // v2
+ MOVQ 16(AX), R10 // v3
+ MOVQ 24(AX), R11 // v4
+
+ // We don't need to check the loop condition here; this function is
+ // always called with at least one block of data to process.
+blockLoop:
+ round(R8)
+ round(R9)
+ round(R10)
+ round(R11)
+
+ CMPQ CX, BX
+ JLE blockLoop
+
+ // Copy vN back to x.
+ MOVQ R8, 0(AX)
+ MOVQ R9, 8(AX)
+ MOVQ R10, 16(AX)
+ MOVQ R11, 24(AX)
+
+ // Construct return slice.
+ // NOTE: It's important that we don't construct a slice that has a base
+ // pointer off the end of the original slice, as in Go 1.7+ this will
+ // cause runtime crashes. (See discussion in, for example,
+ // https://github.com/golang/go/issues/16772.)
+ // Therefore, we calculate the length/cap first, and if they're zero, we
+ // keep the old base. This is what the compiler does as well if you
+ // write code like
+ // b = b[len(b):]
+
+ // New length is 32 - (CX - BX) -> BX+32 - CX.
+ ADDQ $32, BX
+ SUBQ CX, BX
+ JZ afterSetBase
+
+ MOVQ CX, ret_base+32(FP)
+
+afterSetBase:
+ MOVQ BX, ret_len+40(FP)
+ MOVQ BX, ret_cap+48(FP) // set cap == len
+
+ RET
diff --git a/vendor/github.com/cespare/xxhash/xxhash_other.go b/vendor/github.com/cespare/xxhash/xxhash_other.go
new file mode 100644
index 0000000..c68d13f
--- /dev/null
+++ b/vendor/github.com/cespare/xxhash/xxhash_other.go
@@ -0,0 +1,75 @@
+// +build !amd64 appengine !gc purego
+
+package xxhash
+
+// Sum64 computes the 64-bit xxHash digest of b.
+func Sum64(b []byte) uint64 {
+ // A simpler version would be
+ // x := New()
+ // x.Write(b)
+ // return x.Sum64()
+ // but this is faster, particularly for small inputs.
+
+ n := len(b)
+ var h uint64
+
+ if n >= 32 {
+ v1 := prime1v + prime2
+ v2 := prime2
+ v3 := uint64(0)
+ v4 := -prime1v
+ for len(b) >= 32 {
+ v1 = round(v1, u64(b[0:8:len(b)]))
+ v2 = round(v2, u64(b[8:16:len(b)]))
+ v3 = round(v3, u64(b[16:24:len(b)]))
+ v4 = round(v4, u64(b[24:32:len(b)]))
+ b = b[32:len(b):len(b)]
+ }
+ h = rol1(v1) + rol7(v2) + rol12(v3) + rol18(v4)
+ h = mergeRound(h, v1)
+ h = mergeRound(h, v2)
+ h = mergeRound(h, v3)
+ h = mergeRound(h, v4)
+ } else {
+ h = prime5
+ }
+
+ h += uint64(n)
+
+ i, end := 0, len(b)
+ for ; i+8 <= end; i += 8 {
+ k1 := round(0, u64(b[i:i+8:len(b)]))
+ h ^= k1
+ h = rol27(h)*prime1 + prime4
+ }
+ if i+4 <= end {
+ h ^= uint64(u32(b[i:i+4:len(b)])) * prime1
+ h = rol23(h)*prime2 + prime3
+ i += 4
+ }
+ for ; i < end; i++ {
+ h ^= uint64(b[i]) * prime5
+ h = rol11(h) * prime1
+ }
+
+ h ^= h >> 33
+ h *= prime2
+ h ^= h >> 29
+ h *= prime3
+ h ^= h >> 32
+
+ return h
+}
+
+func writeBlocks(x *xxh, b []byte) []byte {
+ v1, v2, v3, v4 := x.v1, x.v2, x.v3, x.v4
+ for len(b) >= 32 {
+ v1 = round(v1, u64(b[0:8:len(b)]))
+ v2 = round(v2, u64(b[8:16:len(b)]))
+ v3 = round(v3, u64(b[16:24:len(b)]))
+ v4 = round(v4, u64(b[24:32:len(b)]))
+ b = b[32:len(b):len(b)]
+ }
+ x.v1, x.v2, x.v3, x.v4 = v1, v2, v3, v4
+ return b
+}
diff --git a/vendor/github.com/cespare/xxhash/xxhash_safe.go b/vendor/github.com/cespare/xxhash/xxhash_safe.go
new file mode 100644
index 0000000..dfa15ab
--- /dev/null
+++ b/vendor/github.com/cespare/xxhash/xxhash_safe.go
@@ -0,0 +1,10 @@
+// +build appengine
+
+// This file contains the safe implementations of otherwise unsafe-using code.
+
+package xxhash
+
+// Sum64String computes the 64-bit xxHash digest of s.
+func Sum64String(s string) uint64 {
+ return Sum64([]byte(s))
+}
diff --git a/vendor/github.com/cespare/xxhash/xxhash_unsafe.go b/vendor/github.com/cespare/xxhash/xxhash_unsafe.go
new file mode 100644
index 0000000..d2b64e8
--- /dev/null
+++ b/vendor/github.com/cespare/xxhash/xxhash_unsafe.go
@@ -0,0 +1,30 @@
+// +build !appengine
+
+// This file encapsulates usage of unsafe.
+// xxhash_safe.go contains the safe implementations.
+
+package xxhash
+
+import (
+ "reflect"
+ "unsafe"
+)
+
+// Sum64String computes the 64-bit xxHash digest of s.
+// It may be faster than Sum64([]byte(s)) by avoiding a copy.
+//
+// TODO(caleb): Consider removing this if an optimization is ever added to make
+// it unnecessary: https://golang.org/issue/2205.
+//
+// TODO(caleb): We still have a function call; we could instead write Go/asm
+// copies of Sum64 for strings to squeeze out a bit more speed.
+func Sum64String(s string) uint64 {
+ // See https://groups.google.com/d/msg/golang-nuts/dcjzJy-bSpw/tcZYBzQqAQAJ
+ // for some discussion about this unsafe conversion.
+ var b []byte
+ bh := (*reflect.SliceHeader)(unsafe.Pointer(&b))
+ bh.Data = (*reflect.StringHeader)(unsafe.Pointer(&s)).Data
+ bh.Len = len(s)
+ bh.Cap = len(s)
+ return Sum64(b)
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/adapter_proxy.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/adapter_proxy.go
index 02fa3de..bbae0ed 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/adapter_proxy.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/adapter_proxy.go
@@ -17,6 +17,7 @@
import (
"context"
+ "github.com/opencord/voltha-lib-go/v3/pkg/db"
"time"
"github.com/golang/protobuf/proto"
@@ -32,14 +33,17 @@
kafkaICProxy kafka.InterContainerProxy
adapterTopic string
coreTopic string
+ endpointMgr kafka.EndpointManager
}
-func NewAdapterProxy(kafkaProxy kafka.InterContainerProxy, adapterTopic string, coreTopic string) *AdapterProxy {
- var proxy AdapterProxy
- proxy.kafkaICProxy = kafkaProxy
- proxy.adapterTopic = adapterTopic
- proxy.coreTopic = coreTopic
- logger.Debugw("TOPICS", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
+func NewAdapterProxy(kafkaProxy kafka.InterContainerProxy, adapterTopic string, coreTopic string, backend *db.Backend) *AdapterProxy {
+ proxy := AdapterProxy{
+ kafkaICProxy: kafkaProxy,
+ adapterTopic: adapterTopic,
+ coreTopic: coreTopic,
+ endpointMgr: kafka.NewEndpointManager(backend),
+ }
+ logger.Debugw("topics", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
return &proxy
}
@@ -87,7 +91,11 @@
}
// Set up the required rpc arguments
- topic := kafka.Topic{Name: toAdapter}
+ endpoint, err := ap.endpointMgr.GetEndpoint(toDeviceId, toAdapter)
+ if err != nil {
+ return err
+ }
+ topic := kafka.Topic{Name: string(endpoint)}
replyToTopic := kafka.Topic{Name: fromAdapter}
rpc := "process_inter_adapter_message"
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/core_proxy.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/core_proxy.go
index 9582f33..20e1a52 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/core_proxy.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/core_proxy.go
@@ -99,6 +99,28 @@
topic := kafka.Topic{Name: ap.coreTopic}
replyToTopic := ap.getAdapterTopic()
args := make([]*kafka.KVArg, 2)
+
+ if adapter.TotalReplicas == 0 && adapter.CurrentReplica != 0 {
+ log.Fatal("totalReplicas can't be 0, since you're here you have at least one")
+ }
+
+ if adapter.CurrentReplica == 0 && adapter.TotalReplicas != 0 {
+ log.Fatal("currentReplica can't be 0, it has to start from 1")
+ }
+
+ if adapter.CurrentReplica == 0 && adapter.TotalReplicas == 0 {
+ // if the adapter is not setting these fields they default to 0,
+ // in that case it means the adapter is not ready to be scaled and thus it defaults
+ // to a single instance
+ adapter.CurrentReplica = 1
+ adapter.TotalReplicas = 1
+ }
+
+ if adapter.CurrentReplica > adapter.TotalReplicas {
+ log.Fatalf("CurrentReplica (%d) can't be greater than TotalReplicas (%d)",
+ adapter.CurrentReplica, adapter.TotalReplicas)
+ }
+
args[0] = &kafka.KVArg{
Key: "adapter",
Value: adapter,
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/configmanager.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/configmanager.go
index 9f08b0d..724ad32 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/configmanager.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/configmanager.go
@@ -63,10 +63,10 @@
ConfigAttribute string
}
-// ConfigManager is a wrapper over backend to maintain Configuration of voltha components
+// ConfigManager is a wrapper over Backend to maintain Configuration of voltha components
// in kvstore based persistent storage
type ConfigManager struct {
- backend *db.Backend
+ Backend *db.Backend
KvStoreConfigPrefix string
}
@@ -95,7 +95,7 @@
return &ConfigManager{
KvStoreConfigPrefix: defaultkvStoreConfigPath,
- backend: &db.Backend{
+ Backend: &db.Backend{
Client: kvClient,
StoreType: kvStoreType,
Host: kvStoreHost,
@@ -108,12 +108,12 @@
// RetrieveComponentList list the component Names for which loglevel is stored in kvstore
func (c *ConfigManager) RetrieveComponentList(ctx context.Context, configType ConfigType) ([]string, error) {
- data, err := c.backend.List(ctx, c.KvStoreConfigPrefix)
+ data, err := c.Backend.List(ctx, c.KvStoreConfigPrefix)
if err != nil {
return nil, err
}
- // Looping through the data recieved from the backend for config
+ // Looping through the data recieved from the Backend for config
// Trimming and Splitting the required key and value from data and storing as componentName,PackageName and Level
// For Example, recieved key would be <Backend Prefix Path>/<Config Prefix>/<Component Name>/<Config Type>/default and value \"DEBUG\"
// Then in default will be stored as PackageName,componentName as <Component Name> and DEBUG will be stored as value in List struct
@@ -168,14 +168,14 @@
c.changeEventChan = make(chan *ConfigChangeEvent, 1)
- c.kvStoreEventChan = c.cManager.backend.CreateWatch(ctx, key, true)
+ c.kvStoreEventChan = c.cManager.Backend.CreateWatch(ctx, key, true)
go c.processKVStoreWatchEvents()
return c.changeEventChan
}
-// processKVStoreWatchEvents process event channel recieved from the backend for any ChangeType
+// processKVStoreWatchEvents process event channel recieved from the Backend for any ChangeType
// It checks for the EventType is valid or not.For the valid EventTypes creates ConfigChangeEvent and send it on channel
func (c *ComponentConfig) processKVStoreWatchEvents() {
@@ -183,7 +183,7 @@
logger.Debugw("processing-kvstore-event-change", log.Fields{"key-prefix": ccKeyPrefix})
- ccPathPrefix := c.cManager.backend.PathPrefix + ccKeyPrefix + kvStorePathSeparator
+ ccPathPrefix := c.cManager.Backend.PathPrefix + ccKeyPrefix + kvStorePathSeparator
for watchResp := range c.kvStoreEventChan {
@@ -210,7 +210,7 @@
logger.Debugw("retrieving-config", log.Fields{"key": key})
- if kvpair, err := c.cManager.backend.Get(ctx, key); err != nil {
+ if kvpair, err := c.cManager.Backend.Get(ctx, key); err != nil {
return "", err
} else {
if kvpair == nil {
@@ -228,17 +228,17 @@
logger.Debugw("retreiving-list", log.Fields{"key": key})
- data, err := c.cManager.backend.List(ctx, key)
+ data, err := c.cManager.Backend.List(ctx, key)
if err != nil {
return nil, err
}
- // Looping through the data recieved from the backend for the given key
+ // Looping through the data recieved from the Backend for the given key
// Trimming the required key and value from data and storing as key/value pair
// For Example, recieved key would be <Backend Prefix Path>/<Config Prefix>/<Component Name>/<Config Type>/default and value \"DEBUG\"
// Then in default will be stored as key and DEBUG will be stored as value in map[string]string
res := make(map[string]string)
- ccPathPrefix := c.cManager.backend.PathPrefix + kvStorePathSeparator + key + kvStorePathSeparator
+ ccPathPrefix := c.cManager.Backend.PathPrefix + kvStorePathSeparator + key + kvStorePathSeparator
for attr, val := range data {
res[strings.TrimPrefix(attr, ccPathPrefix)] = strings.Trim(fmt.Sprintf("%s", val.Value), "\"")
}
@@ -252,7 +252,7 @@
logger.Debugw("saving-config", log.Fields{"key": key, "value": configValue})
//save the data for update config
- if err := c.cManager.backend.Put(ctx, key, configValue); err != nil {
+ if err := c.cManager.Backend.Put(ctx, key, configValue); err != nil {
return err
}
return nil
@@ -264,7 +264,7 @@
logger.Debugw("deleting-config", log.Fields{"key": key})
//delete the config
- if err := c.cManager.backend.Delete(ctx, key); err != nil {
+ if err := c.cManager.Backend.Delete(ctx, key); err != nil {
return err
}
return nil
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/logcontroller.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/logcontroller.go
index b929c9d..9c36241 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/logcontroller.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/logcontroller.go
@@ -15,7 +15,7 @@
*/
// Package Config provides dynamic logging configuration for specific Voltha component with loglevel lookup
-// from etcd kvstore implemented using backend.
+// from etcd kvstore implemented using Backend.
// Any Voltha component can start utilizing dynamic logging by starting goroutine of StartLogLevelConfigProcessing after
// starting kvClient for the component.
@@ -121,8 +121,8 @@
}
// ProcessLogConfig will first load and apply log config and then start waiting on component config and global config
-// channels for any changes. Event channel will be recieved from backend for valid change type
-// Then data for componentn log config and global log config will be retrieved from backend and stored in updatedLogConfig in precedence order
+// channels for any changes. Event channel will be recieved from Backend for valid change type
+// Then data for componentn log config and global log config will be retrieved from Backend and stored in updatedLogConfig in precedence order
// If any changes in updatedLogConfig will be applied on component
func (c *ComponentLogController) processLogConfig(ctx context.Context) {
@@ -247,7 +247,7 @@
return componentLogConfig, nil
}
-// buildUpdatedLogConfig retrieve the global logConfig and component logConfig from backend
+// buildUpdatedLogConfig retrieve the global logConfig and component logConfig from Backend
// component logConfig stores the log config with precedence order
// For example, If the global logConfig is set and component logConfig is set only for specific package then
// component logConfig is stored with global logConfig and component logConfig of specific package
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/endpoint_manager.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/endpoint_manager.go
new file mode 100644
index 0000000..4c13c76
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/endpoint_manager.go
@@ -0,0 +1,352 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+ "context"
+ "fmt"
+ "github.com/buraksezer/consistent"
+ "github.com/cespare/xxhash"
+ "github.com/golang/protobuf/proto"
+ "github.com/opencord/voltha-lib-go/v3/pkg/db"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "sync"
+)
+
+const (
+ // All the values below can be tuned to get optimal data distribution. The numbers below seems to work well when
+ // supporting 1000-10000 devices and 1 - 20 replicas of a service
+
+ // Keys are distributed among partitions. Prime numbers are good to distribute keys uniformly.
+ DefaultPartitionCount = 1117
+
+ // Represents how many times a node is replicated on the consistent ring.
+ DefaultReplicationFactor = 117
+
+ // Load is used to calculate average load.
+ DefaultLoad = 1.1
+)
+
+type Endpoint string // Endpoint of a service instance. When using kafka, this is the topic name of a service
+type ReplicaID int32 // The replication ID of a service instance
+
+type EndpointManager interface {
+
+ // GetEndpoint is called to get the endpoint to communicate with for a specific device and service type. For
+ // now this will return the topic name
+ GetEndpoint(deviceID string, serviceType string) (Endpoint, error)
+
+ // IsDeviceOwnedByService is invoked when a specific service (service type + replicaNumber) is restarted and
+ // devices owned by that service need to be reconciled
+ IsDeviceOwnedByService(deviceID string, serviceType string, replicaNumber int32) (bool, error)
+
+ // getReplicaAssignment returns the replica number of the service that owns the deviceID. This is used by the
+ // test only
+ getReplicaAssignment(deviceID string, serviceType string) (ReplicaID, error)
+}
+
+type service struct {
+ id string // Id of the service. The same id is used for all replicas
+ totalReplicas int32
+ replicas map[ReplicaID]Endpoint
+ consistentRing *consistent.Consistent
+}
+
+type endpointManager struct {
+ partitionCount int
+ replicationFactor int
+ load float64
+ backend *db.Backend
+ services map[string]*service
+ servicesLock sync.RWMutex
+ deviceTypeServiceMap map[string]string
+ deviceTypeServiceMapLock sync.RWMutex
+}
+
+type EndpointManagerOption func(*endpointManager)
+
+func PartitionCount(count int) EndpointManagerOption {
+ return func(args *endpointManager) {
+ args.partitionCount = count
+ }
+}
+
+func ReplicationFactor(replicas int) EndpointManagerOption {
+ return func(args *endpointManager) {
+ args.replicationFactor = replicas
+ }
+}
+
+func Load(load float64) EndpointManagerOption {
+ return func(args *endpointManager) {
+ args.load = load
+ }
+}
+
+func newEndpointManager(backend *db.Backend, opts ...EndpointManagerOption) EndpointManager {
+ tm := &endpointManager{
+ partitionCount: DefaultPartitionCount,
+ replicationFactor: DefaultReplicationFactor,
+ load: DefaultLoad,
+ backend: backend,
+ services: make(map[string]*service),
+ deviceTypeServiceMap: make(map[string]string),
+ }
+
+ for _, option := range opts {
+ option(tm)
+ }
+ return tm
+}
+
+func NewEndpointManager(backend *db.Backend, opts ...EndpointManagerOption) EndpointManager {
+ return newEndpointManager(backend, opts...)
+}
+
+func (ep *endpointManager) GetEndpoint(deviceID string, serviceType string) (Endpoint, error) {
+ logger.Debugw("getting-endpoint", log.Fields{"device-id": deviceID, "service": serviceType})
+ owner, err := ep.getOwner(deviceID, serviceType)
+ if err != nil {
+ return "", err
+ }
+ m, ok := owner.(Member)
+ if !ok {
+ return "", status.Errorf(codes.Aborted, "invalid-member-%v", owner)
+ }
+ endpoint := m.getEndPoint()
+ if endpoint == "" {
+ return "", status.Errorf(codes.Unavailable, "endpoint-not-set-%s", serviceType)
+ }
+ logger.Debugw("returning-endpoint", log.Fields{"device-id": deviceID, "service": serviceType, "endpoint": endpoint})
+ return endpoint, nil
+}
+
+func (ep *endpointManager) IsDeviceOwnedByService(deviceID string, serviceType string, replicaNumber int32) (bool, error) {
+ logger.Debugw("device-ownership", log.Fields{"device-id": deviceID, "service": serviceType, "replica-number": replicaNumber})
+ owner, err := ep.getOwner(deviceID, serviceType)
+ if err != nil {
+ return false, nil
+ }
+ m, ok := owner.(Member)
+ if !ok {
+ return false, status.Errorf(codes.Aborted, "invalid-member-%v", owner)
+ }
+ return m.getReplica() == ReplicaID(replicaNumber), nil
+}
+
+func (ep *endpointManager) getReplicaAssignment(deviceID string, serviceType string) (ReplicaID, error) {
+ owner, err := ep.getOwner(deviceID, serviceType)
+ if err != nil {
+ return 0, nil
+ }
+ m, ok := owner.(Member)
+ if !ok {
+ return 0, status.Errorf(codes.Aborted, "invalid-member-%v", owner)
+ }
+ return m.getReplica(), nil
+}
+
+func (ep *endpointManager) getOwner(deviceID string, serviceType string) (consistent.Member, error) {
+ serv, dType, err := ep.getServiceAndDeviceType(serviceType)
+ if err != nil {
+ return nil, err
+ }
+ key := ep.makeKey(deviceID, dType, serviceType)
+ return serv.consistentRing.LocateKey(key), nil
+}
+
+func (ep *endpointManager) getServiceAndDeviceType(serviceType string) (*service, string, error) {
+ // Check whether service exist
+ ep.servicesLock.RLock()
+ serv, serviceExist := ep.services[serviceType]
+ ep.servicesLock.RUnlock()
+
+ // Load the service and device types if needed
+ if !serviceExist || serv == nil || int(serv.totalReplicas) != len(serv.consistentRing.GetMembers()) {
+ if err := ep.loadServices(); err != nil {
+ return nil, "", err
+ }
+
+ // Check whether the service exists now
+ ep.servicesLock.RLock()
+ serv, serviceExist = ep.services[serviceType]
+ ep.servicesLock.RUnlock()
+ if !serviceExist || serv == nil || int(serv.totalReplicas) != len(serv.consistentRing.GetMembers()) {
+ return nil, "", status.Errorf(codes.NotFound, "service-%s", serviceType)
+ }
+ }
+
+ ep.deviceTypeServiceMapLock.RLock()
+ defer ep.deviceTypeServiceMapLock.RUnlock()
+ for dType, sType := range ep.deviceTypeServiceMap {
+ if sType == serviceType {
+ return serv, dType, nil
+ }
+ }
+ return nil, "", status.Errorf(codes.NotFound, "service-%s", serviceType)
+}
+
+func (ep *endpointManager) getConsistentConfig() consistent.Config {
+ return consistent.Config{
+ PartitionCount: ep.partitionCount,
+ ReplicationFactor: ep.replicationFactor,
+ Load: ep.load,
+ Hasher: hasher{},
+ }
+}
+
+// loadServices loads the services (adapters) and device types in memory. Because of the small size of the data and
+// the data format in the dB being binary protobuf then it is better to load all the data if inconsistency is detected,
+// instead of watching for updates in the dB and acting on it.
+func (ep *endpointManager) loadServices() error {
+ ep.servicesLock.Lock()
+ defer ep.servicesLock.Unlock()
+ ep.deviceTypeServiceMapLock.Lock()
+ defer ep.deviceTypeServiceMapLock.Unlock()
+
+ if ep.backend == nil {
+ return status.Error(codes.Aborted, "backend-not-set")
+ }
+ ep.services = make(map[string]*service)
+ ep.deviceTypeServiceMap = make(map[string]string)
+
+ // Load the adapters
+ blobs, err := ep.backend.List(context.Background(), "adapters")
+ if err != nil {
+ return err
+ }
+
+ // Data is marshalled as proto bytes in the data store
+ for _, blob := range blobs {
+ data := blob.Value.([]byte)
+ adapter := &voltha.Adapter{}
+ if err := proto.Unmarshal(data, adapter); err != nil {
+ return err
+ }
+ // A valid adapter should have the vendorID set
+ if adapter.Vendor != "" {
+ if _, ok := ep.services[adapter.Type]; !ok {
+ ep.services[adapter.Type] = &service{
+ id: adapter.Type,
+ totalReplicas: adapter.TotalReplicas,
+ replicas: make(map[ReplicaID]Endpoint),
+ consistentRing: consistent.New(nil, ep.getConsistentConfig()),
+ }
+
+ }
+ currentReplica := ReplicaID(adapter.CurrentReplica)
+ endpoint := Endpoint(adapter.Endpoint)
+ ep.services[adapter.Type].replicas[currentReplica] = endpoint
+ ep.services[adapter.Type].consistentRing.Add(newMember(adapter.Id, adapter.Type, adapter.Vendor, endpoint, adapter.Version, currentReplica))
+ }
+ }
+ // Load the device types
+ blobs, err = ep.backend.List(context.Background(), "device_types")
+ if err != nil {
+ return err
+ }
+ for _, blob := range blobs {
+ data := blob.Value.([]byte)
+ deviceType := &voltha.DeviceType{}
+ if err := proto.Unmarshal(data, deviceType); err != nil {
+ return err
+ }
+ if _, ok := ep.deviceTypeServiceMap[deviceType.Id]; !ok {
+ ep.deviceTypeServiceMap[deviceType.Id] = deviceType.Adapter
+ }
+ }
+
+ // Log the loaded data in debug mode to facilitate trouble shooting
+ if logger.V(log.DebugLevel) {
+ for key, val := range ep.services {
+ members := val.consistentRing.GetMembers()
+ logger.Debugw("service", log.Fields{"service": key, "expected-replica": val.totalReplicas, "replicas": len(val.consistentRing.GetMembers())})
+ for _, m := range members {
+ n := m.(Member)
+ logger.Debugw("service-loaded", log.Fields{"serviceId": n.getID(), "serviceType": n.getServiceType(), "replica": n.getReplica(), "endpoint": n.getEndPoint()})
+ }
+ }
+ logger.Debugw("device-types-loaded", log.Fields{"device-types": ep.deviceTypeServiceMap})
+ }
+ return nil
+}
+
+// makeKey creates the string that the hash function uses to create the hash
+func (ep *endpointManager) makeKey(deviceID string, deviceType string, serviceType string) []byte {
+ return []byte(fmt.Sprintf("%s_%s_%s", serviceType, deviceType, deviceID))
+}
+
+// The consistent package requires a hasher function
+type hasher struct{}
+
+// Sum64 provides the hasher function. Based upon numerous testing scenarios, the xxhash package seems to provide the
+// best distribution compare to other hash packages
+func (h hasher) Sum64(data []byte) uint64 {
+ return xxhash.Sum64(data)
+}
+
+// Member represents a member on the consistent ring
+type Member interface {
+ String() string
+ getReplica() ReplicaID
+ getEndPoint() Endpoint
+ getID() string
+ getServiceType() string
+}
+
+// member implements the Member interface
+type member struct {
+ id string
+ serviceType string
+ vendor string
+ version string
+ replica ReplicaID
+ endpoint Endpoint
+}
+
+func newMember(ID string, serviceType string, vendor string, endPoint Endpoint, version string, replica ReplicaID) Member {
+ return &member{
+ id: ID,
+ serviceType: serviceType,
+ vendor: vendor,
+ version: version,
+ replica: replica,
+ endpoint: endPoint,
+ }
+}
+
+func (m *member) String() string {
+ return string(m.endpoint)
+}
+
+func (m *member) getReplica() ReplicaID {
+ return m.replica
+}
+
+func (m *member) getEndPoint() Endpoint {
+ return m.endpoint
+}
+
+func (m *member) getID() string {
+ return m.id
+}
+
+func (m *member) getServiceType() string {
+ return m.serviceType
+}
diff --git a/vendor/github.com/opencord/voltha-protos/v3/go/voltha/adapter.pb.go b/vendor/github.com/opencord/voltha-protos/v3/go/voltha/adapter.pb.go
index 4cc76e0..9359dc1 100644
--- a/vendor/github.com/opencord/voltha-protos/v3/go/voltha/adapter.pb.go
+++ b/vendor/github.com/opencord/voltha-protos/v3/go/voltha/adapter.pb.go
@@ -65,8 +65,8 @@
// Adapter (software plugin)
type Adapter struct {
- // Unique name of adapter, matching the python package name under
- // voltha/adapters.
+ // the adapter ID has to be unique,
+ // it will be generated as Type + CurrentReplica
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
Vendor string `protobuf:"bytes,2,opt,name=vendor,proto3" json:"vendor,omitempty"`
Version string `protobuf:"bytes,3,opt,name=version,proto3" json:"version,omitempty"`
@@ -76,10 +76,16 @@
AdditionalDescription *any.Any `protobuf:"bytes,64,opt,name=additional_description,json=additionalDescription,proto3" json:"additional_description,omitempty"`
LogicalDeviceIds []string `protobuf:"bytes,4,rep,name=logical_device_ids,json=logicalDeviceIds,proto3" json:"logical_device_ids,omitempty"`
// timestamp when the adapter last sent a message to the core
- LastCommunication *timestamp.Timestamp `protobuf:"bytes,5,opt,name=last_communication,json=lastCommunication,proto3" json:"last_communication,omitempty"`
- XXX_NoUnkeyedLiteral struct{} `json:"-"`
- XXX_unrecognized []byte `json:"-"`
- XXX_sizecache int32 `json:"-"`
+ LastCommunication *timestamp.Timestamp `protobuf:"bytes,5,opt,name=last_communication,json=lastCommunication,proto3" json:"last_communication,omitempty"`
+ CurrentReplica int32 `protobuf:"varint,6,opt,name=currentReplica,proto3" json:"currentReplica,omitempty"`
+ TotalReplicas int32 `protobuf:"varint,7,opt,name=totalReplicas,proto3" json:"totalReplicas,omitempty"`
+ Endpoint string `protobuf:"bytes,8,opt,name=endpoint,proto3" json:"endpoint,omitempty"`
+ // all replicas of the same adapter will have the same type
+ // it is used to associate a device to an adapter
+ Type string `protobuf:"bytes,9,opt,name=type,proto3" json:"type,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
}
func (m *Adapter) Reset() { *m = Adapter{} }
@@ -156,6 +162,34 @@
return nil
}
+func (m *Adapter) GetCurrentReplica() int32 {
+ if m != nil {
+ return m.CurrentReplica
+ }
+ return 0
+}
+
+func (m *Adapter) GetTotalReplicas() int32 {
+ if m != nil {
+ return m.TotalReplicas
+ }
+ return 0
+}
+
+func (m *Adapter) GetEndpoint() string {
+ if m != nil {
+ return m.Endpoint
+ }
+ return ""
+}
+
+func (m *Adapter) GetType() string {
+ if m != nil {
+ return m.Type
+ }
+ return ""
+}
+
type Adapters struct {
Items []*Adapter `protobuf:"bytes,1,rep,name=items,proto3" json:"items,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
@@ -204,31 +238,34 @@
func init() { proto.RegisterFile("voltha_protos/adapter.proto", fileDescriptor_7e998ce153307274) }
var fileDescriptor_7e998ce153307274 = []byte{
- // 405 bytes of a gzipped FileDescriptorProto
- 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x92, 0xd1, 0x6a, 0xdb, 0x30,
- 0x14, 0x86, 0x71, 0xb2, 0xb8, 0xab, 0x4a, 0x59, 0xaa, 0x2d, 0xc3, 0xf3, 0x28, 0x35, 0x81, 0x81,
- 0x2f, 0x56, 0x99, 0xb5, 0x2f, 0xb0, 0xa4, 0xbd, 0xe9, 0xad, 0x28, 0xbb, 0xd8, 0x8d, 0x51, 0x24,
- 0xd5, 0x15, 0xd8, 0x3a, 0xc6, 0x52, 0x0c, 0x7d, 0xc8, 0xbd, 0xc1, 0x1e, 0x60, 0x4f, 0xb0, 0xeb,
- 0x11, 0x49, 0x26, 0x4e, 0x06, 0xbd, 0x32, 0xfa, 0xbf, 0xff, 0xfc, 0xe7, 0x1c, 0xc9, 0xe8, 0x73,
- 0x0f, 0xb5, 0x7d, 0x66, 0x65, 0xdb, 0x81, 0x05, 0x53, 0x30, 0xc1, 0x5a, 0x2b, 0x3b, 0xe2, 0x8e,
- 0x38, 0xf6, 0x30, 0xfd, 0x54, 0x01, 0x54, 0xb5, 0x2c, 0x9c, 0xba, 0xd9, 0x3e, 0x15, 0x4c, 0xbf,
- 0x78, 0x4b, 0x9a, 0x1e, 0xd6, 0x73, 0x68, 0x1a, 0xd0, 0x81, 0x25, 0x87, 0xac, 0x91, 0x96, 0x05,
- 0x72, 0x75, 0x1c, 0x68, 0x55, 0x23, 0x8d, 0x65, 0x4d, 0xeb, 0x0d, 0x4b, 0x8a, 0xce, 0x57, 0x7e,
- 0x94, 0x3b, 0xd0, 0x4f, 0xaa, 0xc2, 0x2b, 0x74, 0xc1, 0x84, 0x50, 0x56, 0x81, 0x66, 0x75, 0xc9,
- 0x9d, 0x98, 0x7c, 0xcf, 0xa2, 0xfc, 0xec, 0xe6, 0x03, 0xf1, 0x69, 0x64, 0x48, 0x23, 0x2b, 0xfd,
- 0x42, 0xe7, 0x7b, 0xbb, 0x8f, 0x58, 0xfe, 0x9e, 0xa0, 0x93, 0x10, 0x8a, 0x17, 0x68, 0xa2, 0x44,
- 0x12, 0x65, 0x51, 0x7e, 0xba, 0x9e, 0xfd, 0xf9, 0xfb, 0xeb, 0x32, 0xa2, 0x13, 0x25, 0xf0, 0x25,
- 0x8a, 0x7b, 0xa9, 0x05, 0x74, 0xc9, 0x64, 0x8c, 0x82, 0x88, 0xaf, 0xd0, 0x49, 0x2f, 0x3b, 0xa3,
- 0x40, 0x27, 0xd3, 0x31, 0x1f, 0x54, 0x7c, 0x8d, 0xe2, 0x30, 0xda, 0xdc, 0x8d, 0xb6, 0x20, 0xfe,
- 0x0a, 0xc8, 0xc1, 0x32, 0x34, 0x98, 0x30, 0x45, 0x1f, 0x47, 0x4b, 0x09, 0x69, 0x78, 0xa7, 0xda,
- 0xdd, 0xe9, 0xb5, 0xcd, 0x86, 0xa6, 0x8b, 0x7d, 0xe9, 0xfd, 0xbe, 0x12, 0x7f, 0x45, 0xb8, 0x86,
- 0x4a, 0x71, 0x17, 0xd8, 0x2b, 0x2e, 0x4b, 0x25, 0x4c, 0xf2, 0x26, 0x9b, 0xe6, 0xa7, 0x74, 0x1e,
- 0xc8, 0xbd, 0x03, 0x0f, 0xc2, 0xe0, 0x07, 0x84, 0x6b, 0x66, 0x6c, 0xb9, 0x7b, 0xb7, 0xad, 0x56,
- 0x9c, 0xb9, 0xee, 0x33, 0xd7, 0x3d, 0xfd, 0xaf, 0xfb, 0xe3, 0xf0, 0x4a, 0xf4, 0x62, 0x57, 0x75,
- 0x37, 0x2e, 0x5a, 0x7e, 0x43, 0x6f, 0xc3, 0x96, 0x06, 0x7f, 0x41, 0x33, 0x65, 0x65, 0x63, 0x92,
- 0x28, 0x9b, 0xe6, 0x67, 0x37, 0xef, 0x8e, 0xae, 0x81, 0x7a, 0xba, 0x7e, 0x44, 0xef, 0xa1, 0xab,
- 0x08, 0xb4, 0x52, 0x73, 0xe8, 0x44, 0x70, 0xad, 0xcf, 0x7f, 0xb8, 0x6f, 0x30, 0xff, 0x24, 0x95,
- 0xb2, 0xcf, 0xdb, 0x0d, 0xe1, 0xd0, 0x14, 0x83, 0xb5, 0xf0, 0xd6, 0xeb, 0xf0, 0x6b, 0xf5, 0xb7,
- 0x45, 0x05, 0x41, 0xdb, 0xc4, 0x4e, 0xbc, 0xfd, 0x17, 0x00, 0x00, 0xff, 0xff, 0x4e, 0xc5, 0xdf,
- 0x09, 0xdb, 0x02, 0x00, 0x00,
+ // 463 bytes of a gzipped FileDescriptorProto
+ 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x52, 0xdd, 0x6e, 0xd3, 0x30,
+ 0x14, 0x56, 0xda, 0x35, 0x6d, 0x3d, 0x15, 0x3a, 0x43, 0x91, 0x09, 0x9a, 0x16, 0x55, 0x80, 0x72,
+ 0xc1, 0x12, 0xb1, 0xbd, 0x00, 0xed, 0x76, 0xb3, 0x5b, 0x6b, 0xe2, 0x82, 0x9b, 0xca, 0xb5, 0xbd,
+ 0xcc, 0x52, 0xe2, 0x13, 0xc5, 0x6e, 0xa4, 0x3e, 0x24, 0x2f, 0xc0, 0x13, 0xf0, 0x04, 0x5c, 0xa3,
+ 0x3a, 0x0e, 0xfd, 0x41, 0xda, 0x55, 0x72, 0xbe, 0xef, 0x3b, 0xdf, 0xf9, 0x33, 0xfa, 0xd0, 0x40,
+ 0x61, 0x9f, 0xd9, 0xaa, 0xaa, 0xc1, 0x82, 0xc9, 0x98, 0x60, 0x95, 0x95, 0x75, 0xea, 0x42, 0x1c,
+ 0xb6, 0x64, 0xf4, 0x3e, 0x07, 0xc8, 0x0b, 0x99, 0x39, 0x74, 0xbd, 0x79, 0xca, 0x98, 0xde, 0xb6,
+ 0x92, 0x28, 0x3a, 0xce, 0xe7, 0x50, 0x96, 0xa0, 0x3d, 0x47, 0x8e, 0xb9, 0x52, 0x5a, 0xe6, 0x99,
+ 0xab, 0x53, 0x43, 0xab, 0x4a, 0x69, 0x2c, 0x2b, 0xab, 0x56, 0x30, 0xa7, 0x68, 0xb2, 0x68, 0x5b,
+ 0xb9, 0x03, 0xfd, 0xa4, 0x72, 0xbc, 0x40, 0x17, 0x4c, 0x08, 0x65, 0x15, 0x68, 0x56, 0xac, 0xb8,
+ 0x03, 0xc9, 0xb7, 0x38, 0x48, 0xce, 0x6f, 0xde, 0xa6, 0xad, 0x5b, 0xda, 0xb9, 0xa5, 0x0b, 0xbd,
+ 0xa5, 0xd3, 0xbd, 0xbc, 0xb5, 0x98, 0xff, 0xea, 0xa3, 0xa1, 0x37, 0xc5, 0x33, 0xd4, 0x53, 0x82,
+ 0x04, 0x71, 0x90, 0x8c, 0x97, 0x83, 0xdf, 0x7f, 0x7e, 0x5e, 0x06, 0xb4, 0xa7, 0x04, 0xbe, 0x44,
+ 0x61, 0x23, 0xb5, 0x80, 0x9a, 0xf4, 0x0e, 0x29, 0x0f, 0xe2, 0x2b, 0x34, 0x6c, 0x64, 0x6d, 0x14,
+ 0x68, 0xd2, 0x3f, 0xe4, 0x3b, 0x14, 0x5f, 0xa3, 0xd0, 0xb7, 0x36, 0x75, 0xad, 0xcd, 0xd2, 0x76,
+ 0x05, 0xe9, 0xd1, 0x30, 0xd4, 0x8b, 0x30, 0x45, 0xef, 0x0e, 0x86, 0x12, 0xd2, 0xf0, 0x5a, 0x55,
+ 0xbb, 0xe8, 0xa5, 0xc9, 0xba, 0xa2, 0xb3, 0x7d, 0xea, 0xfd, 0x3e, 0x13, 0x7f, 0x41, 0xb8, 0x80,
+ 0x5c, 0x71, 0x67, 0xd8, 0x28, 0x2e, 0x57, 0x4a, 0x18, 0x72, 0x16, 0xf7, 0x93, 0x31, 0x9d, 0x7a,
+ 0xe6, 0xde, 0x11, 0x0f, 0xc2, 0xe0, 0x07, 0x84, 0x0b, 0x66, 0xec, 0x6a, 0x77, 0xb7, 0x8d, 0x56,
+ 0x9c, 0xb9, 0xea, 0x03, 0x57, 0x3d, 0xfa, 0xaf, 0xfa, 0x63, 0x77, 0x25, 0x7a, 0xb1, 0xcb, 0xba,
+ 0x3b, 0x4c, 0xc2, 0x9f, 0xd1, 0x2b, 0xbe, 0xa9, 0x6b, 0xa9, 0x2d, 0x95, 0x55, 0xa1, 0x38, 0x23,
+ 0x61, 0x1c, 0x24, 0x03, 0x7a, 0x82, 0xe2, 0x8f, 0x68, 0x62, 0xc1, 0xb2, 0xc2, 0xc7, 0x86, 0x0c,
+ 0x9d, 0xec, 0x18, 0xc4, 0x11, 0x1a, 0x49, 0x2d, 0x2a, 0x50, 0xda, 0x92, 0xd1, 0x6e, 0xd7, 0xf4,
+ 0x5f, 0x8c, 0x31, 0x3a, 0xb3, 0xdb, 0x4a, 0x92, 0xb1, 0xc3, 0xdd, 0xff, 0xfc, 0x2b, 0x1a, 0xf9,
+ 0x1d, 0x1b, 0xfc, 0x09, 0x0d, 0x94, 0x95, 0xa5, 0x21, 0x41, 0xdc, 0x4f, 0xce, 0x6f, 0x5e, 0x9f,
+ 0x1c, 0x81, 0xb6, 0xec, 0xf2, 0x11, 0xbd, 0x81, 0x3a, 0x4f, 0xa1, 0x92, 0x9a, 0x43, 0x2d, 0xbc,
+ 0x6a, 0x39, 0xf9, 0xee, 0xbe, 0x5e, 0xfc, 0x23, 0xcd, 0x95, 0x7d, 0xde, 0xac, 0x53, 0x0e, 0x65,
+ 0xd6, 0x49, 0xb3, 0x56, 0x7a, 0xed, 0x1f, 0x76, 0x73, 0x9b, 0xe5, 0xe0, 0xb1, 0x75, 0xe8, 0xc0,
+ 0xdb, 0xbf, 0x01, 0x00, 0x00, 0xff, 0xff, 0xef, 0x64, 0x5e, 0x10, 0x59, 0x03, 0x00, 0x00,
}
diff --git a/vendor/modules.txt b/vendor/modules.txt
index 0b00fb4..b687f69 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -10,8 +10,12 @@
github.com/boljen/go-bitmap
# github.com/bsm/sarama-cluster v2.1.15+incompatible
github.com/bsm/sarama-cluster
+# github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72
+github.com/buraksezer/consistent
# github.com/cenkalti/backoff/v3 v3.1.1
github.com/cenkalti/backoff/v3
+# github.com/cespare/xxhash v1.1.0
+github.com/cespare/xxhash
# github.com/cevaris/ordered_map v0.0.0-20190319150403-3adeae072e73
github.com/cevaris/ordered_map
# github.com/coreos/go-systemd v0.0.0-20190620071333-e64a0ec8b42a
@@ -63,7 +67,7 @@
github.com/mitchellh/go-homedir
# github.com/mitchellh/mapstructure v1.1.2
github.com/mitchellh/mapstructure
-# github.com/opencord/voltha-lib-go/v3 v3.1.0
+# github.com/opencord/voltha-lib-go/v3 v3.1.1
github.com/opencord/voltha-lib-go/v3/pkg/adapters
github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif
github.com/opencord/voltha-lib-go/v3/pkg/adapters/common
@@ -78,7 +82,7 @@
github.com/opencord/voltha-lib-go/v3/pkg/probe
github.com/opencord/voltha-lib-go/v3/pkg/techprofile
github.com/opencord/voltha-lib-go/v3/pkg/version
-# github.com/opencord/voltha-protos/v3 v3.2.8
+# github.com/opencord/voltha-protos/v3 v3.3.0
github.com/opencord/voltha-protos/v3/go/common
github.com/opencord/voltha-protos/v3/go/inter_container
github.com/opencord/voltha-protos/v3/go/omci