[VOL-2835] Using different topic per ONU device
Change-Id: I3e55064292f28f9bf39ad6bc75fd5758f5313317
diff --git a/BUILD.md b/BUILD.md
index 0127947..7572210 100644
--- a/BUILD.md
+++ b/BUILD.md
@@ -1,5 +1,7 @@
# How to Build and Develop VOLTHA
+Please refer to this guide: https://docs.voltha.org/master/overview/dev_virtual.html
+
## Building natively on MAC OS X
For advanced developers this may provide a more comfortable developer environment
diff --git a/VERSION b/VERSION
index eef2640..a724a9c 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-2.4.0-dev0
+2.4.0-dev
diff --git a/compose/system-test-bbsim.yml b/compose/system-test-bbsim.yml
index 4a6e9d6..3370111 100644
--- a/compose/system-test-bbsim.yml
+++ b/compose/system-test-bbsim.yml
@@ -26,9 +26,9 @@
environment:
SERVICE_2181_NAME: "zookeeper"
ports:
- - 2181:2181
+ - 2181:2181
networks:
- - default
+ - default
restart: unless-stopped
@@ -43,9 +43,9 @@
volumes:
- /var/run/docker.sock:/var/run/docker.sock
ports:
- - 9092:9092
+ - 9092:9092
networks:
- - default
+ - default
restart: unless-stopped
@@ -63,37 +63,37 @@
"--initial-cluster-state=new"
]
ports:
- - "2379:2379"
- - 2380
- - 4001
+ - "2379:2379"
+ - 2380
+ - 4001
networks:
- - default
+ - default
restart: unless-stopped
rw_core:
image: "${DOCKER_REGISTRY}${DOCKER_REPOSITORY}voltha-rw-core:${DOCKER_TAG}"
entrypoint:
- - /app/rw_core
- - -kv_store_type=etcd
- - -kv_store_host=${DOCKER_HOST_IP}
- - -kv_store_port=2379
- - -grpc_port=50057
- - -banner=true
- - -kafka_adapter_host=${DOCKER_HOST_IP}
- - -kafka_adapter_port=9092
- - -kafka_cluster_host=${DOCKER_HOST_IP}
- - -kafka_cluster_port=9092
- - -rw_core_topic=rwcore
- - -kv_store_data_prefix=service/voltha
- - -in_competing_mode=false
- - -log_level=DEBUG
+ - /app/rw_core
+ - -kv_store_type=etcd
+ - -kv_store_host=${DOCKER_HOST_IP}
+ - -kv_store_port=2379
+ - -grpc_port=50057
+ - -banner=true
+ - -kafka_adapter_host=${DOCKER_HOST_IP}
+ - -kafka_adapter_port=9092
+ - -kafka_cluster_host=${DOCKER_HOST_IP}
+ - -kafka_cluster_port=9092
+ - -rw_core_topic=rwcore
+ - -kv_store_data_prefix=service/voltha
+ - -in_competing_mode=false
+ - -log_level=DEBUG
volumes:
- - "/var/run/docker.sock:/tmp/docker.sock"
+ - "/var/run/docker.sock:/tmp/docker.sock"
ports:
- 50057:50057
networks:
- - default
+ - default
restart: unless-stopped
@@ -108,23 +108,23 @@
volumes:
- "/var/run/docker.sock:/tmp/docker.sock"
networks:
- - default
+ - default
restart: unless-stopped
onos:
image: "${DOCKER_REGISTRY}${DOCKER_REPOSITORY}voltha-onos:${DOCKER_TAG}"
ports:
- - "8101:8101" # ssh
- - "6653:6653" # OF
- - "8181:8181" # UI
+ - "8101:8101" # ssh
+ - "6653:6653" # OF
+ - "8181:8181" # UI
environment:
ONOS_APPS: 'drivers,openflow-base'
volumes:
- - "/var/run/docker.sock:/tmp/docker.sock"
- - "./network-cfg-bbsim.json:/root/onos/config/network-cfg.json"
+ - "/var/run/docker.sock:/tmp/docker.sock"
+ - "./network-cfg-bbsim.json:/root/onos/config/network-cfg.json"
networks:
- - default
+ - default
restart: unless-stopped
@@ -144,7 +144,7 @@
ports:
- "50062:50062"
networks:
- - default
+ - default
restart: unless-stopped
@@ -161,7 +161,7 @@
"--log_level=DEBUG"
]
networks:
- - default
+ - default
restart: unless-stopped
@@ -180,10 +180,10 @@
"1"
]
ports:
- - "50060:50060"
- - "50074:50074"
+ - "50060:50060"
+ - "50074:50074"
networks:
- - default
+ - default
restart: unless-stopped
@@ -197,9 +197,9 @@
- "./radius-clients.conf:/etc/raddb/clients.conf"
- "./radius-users.conf:/etc/raddb/users"
ports:
- - "1812:1812/udp"
- - "1813:1813"
- - "18120:18120"
+ - "1812:1812/udp"
+ - "1813:1813"
+ - "18120:18120"
networks:
- - default
- restart: unless-stopped
+ - default
+ restart: unless-stopped
\ No newline at end of file
diff --git a/go.mod b/go.mod
index 2268c09..2a2b789 100644
--- a/go.mod
+++ b/go.mod
@@ -6,8 +6,8 @@
github.com/gogo/protobuf v1.3.0
github.com/golang/protobuf v1.3.2
github.com/google/uuid v1.1.1
- github.com/opencord/voltha-lib-go/v3 v3.1.0
- github.com/opencord/voltha-protos/v3 v3.2.8
+ github.com/opencord/voltha-lib-go/v3 v3.1.2
+ github.com/opencord/voltha-protos/v3 v3.3.0
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2
github.com/stretchr/testify v1.4.0
google.golang.org/grpc v1.24.0
diff --git a/go.sum b/go.sum
index 3a03962..61fd9aa 100644
--- a/go.sum
+++ b/go.sum
@@ -4,6 +4,8 @@
github.com/DataDog/zstd v1.3.6-0.20190409195224-796139022798/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/DataDog/zstd v1.4.1 h1:3oxKN3wbHibqx897utPC2LTQU4J+IHWWJO+glkAkpFM=
github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
+github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
+github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/Shopify/sarama v1.23.1 h1:XxJBCZEoWJtoWjf/xRbmGUpAmTZGnuuF0ON0EvxxBrs=
github.com/Shopify/sarama v1.23.1/go.mod h1:XLH1GYJnLVE0XCr6KdJGVJRTwY30moWNJ4sERjXX6fs=
github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc=
@@ -25,6 +27,10 @@
github.com/boljen/go-bitmap v0.0.0-20151001105940-23cd2fb0ce7d/go.mod h1:f1iKL6ZhUWvbk7PdWVmOaak10o86cqMUYEmn1CZNGEI=
github.com/bsm/sarama-cluster v2.1.15+incompatible h1:RkV6WiNRnqEEbp81druK8zYhmnIgdOjqSVi0+9Cnl2A=
github.com/bsm/sarama-cluster v2.1.15+incompatible/go.mod h1:r7ao+4tTNXvWm+VRpRJchr2kQhqxgmAp2iEX5W96gMM=
+github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72 h1:fUmDBbSvv1uOzo/t8WaxZMVb7BxJ8JECo5lGoR9c5bA=
+github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72/go.mod h1:OEE5igu/CDjGegM1Jn6ZMo7R6LlV/JChAkjfQQIRLpg=
+github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
+github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cevaris/ordered_map v0.0.0-20190319150403-3adeae072e73 h1:q1g9lSyo/nOIC3W5E3FK3Unrz8b9LdLXCyuC+ZcpPC0=
github.com/cevaris/ordered_map v0.0.0-20190319150403-3adeae072e73/go.mod h1:507vXsotcZop7NZfBWdhPmVeOse4ko2R7AagJYrpoEg=
github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag=
@@ -190,10 +196,10 @@
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.2 h1:3mYCb7aPxS/RU7TI1y4rkEn1oKmPRjNJLNEXgw7MH2I=
github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
-github.com/opencord/voltha-lib-go/v3 v3.1.0 h1:3KPBQT2s5HdCo6bwWHpNFeR8KYbDQMmZEq+aHim6U/o=
-github.com/opencord/voltha-lib-go/v3 v3.1.0/go.mod h1:iOFpFIonmGq6nYG4qYWrF5cKyt7UupDLyB4iGXFDRig=
-github.com/opencord/voltha-protos/v3 v3.2.8 h1:FuMEmUsW4WRpIsNCcELM9QBZZwBLRvuw85DVvfpZ1do=
-github.com/opencord/voltha-protos/v3 v3.2.8/go.mod h1:nl1ETp5Iw3avxOaKD8BJlYY5wYI4KeV95aT1pL63nto=
+github.com/opencord/voltha-lib-go/v3 v3.1.2 h1:1BUoi8cljTUlYueGCZCtrWf/a5+KhNupPTILOWEDZVo=
+github.com/opencord/voltha-lib-go/v3 v3.1.2/go.mod h1:ad7C/5/09RcYvGQrxUH4AuOiO8OSQqGmCgEJNEpaag8=
+github.com/opencord/voltha-protos/v3 v3.3.0 h1:1Q1C6nWSkjaJY87GQgc7hWU6kqjkWdM+rzqSXBKb0cQ=
+github.com/opencord/voltha-protos/v3 v3.3.0/go.mod h1:nl1ETp5Iw3avxOaKD8BJlYY5wYI4KeV95aT1pL63nto=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
@@ -234,6 +240,8 @@
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/soheilhy/cmux v0.1.4 h1:0HKaf1o97UwFjHH9o5XsHUOF+tqmdA7KEzXLpiyaw0E=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
+github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
+github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
github.com/spf13/pflag v1.0.1/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
diff --git a/rw_core/core/adapter_manager.go b/rw_core/core/adapter_manager.go
index 1ed5b23..faece55 100644
--- a/rw_core/core/adapter_manager.go
+++ b/rw_core/core/adapter_manager.go
@@ -32,12 +32,6 @@
"github.com/opencord/voltha-protos/v3/go/voltha"
)
-// sentinel constants
-const (
- SentinelAdapterID = "adapter_sentinel"
- SentinelDevicetypeID = "device_type_sentinel"
-)
-
// AdapterAgent represents adapter agent
type AdapterAgent struct {
adapter *voltha.Adapter
@@ -58,15 +52,6 @@
return &adapterAgent
}
-func (aa *AdapterAgent) getDeviceType(deviceType string) *voltha.DeviceType {
- aa.lock.RLock()
- defer aa.lock.RUnlock()
- if _, exist := aa.deviceTypes[deviceType]; exist {
- return aa.deviceTypes[deviceType]
- }
- return nil
-}
-
func (aa *AdapterAgent) getAdapter() *voltha.Adapter {
aa.lock.RLock()
defer aa.lock.RUnlock()
@@ -74,12 +59,6 @@
return aa.adapter
}
-func (aa *AdapterAgent) updateDeviceType(deviceType *voltha.DeviceType) {
- aa.lock.Lock()
- defer aa.lock.Unlock()
- aa.deviceTypes[deviceType.Id] = deviceType
-}
-
// updateCommunicationTime updates the message to the specified time.
// No attempt is made to save the time to the db, so only recent times are guaranteed to be accurate.
func (aa *AdapterAgent) updateCommunicationTime(new time.Time) {
@@ -99,7 +78,7 @@
// AdapterManager represents adapter manager attributes
type AdapterManager struct {
adapterAgents map[string]*AdapterAgent
- deviceTypeToAdapterMap map[string]string
+ deviceTypes map[string]*voltha.DeviceType
clusterDataProxy *model.Proxy
deviceMgr *DeviceManager
coreInstanceID string
@@ -110,12 +89,12 @@
func newAdapterManager(cdProxy *model.Proxy, coreInstanceID string, kafkaClient kafka.Client, deviceMgr *DeviceManager) *AdapterManager {
aMgr := &AdapterManager{
- exitChannel: make(chan int, 1),
- coreInstanceID: coreInstanceID,
- clusterDataProxy: cdProxy,
- adapterAgents: make(map[string]*AdapterAgent),
- deviceTypeToAdapterMap: make(map[string]string),
- deviceMgr: deviceMgr,
+ exitChannel: make(chan int, 1),
+ coreInstanceID: coreInstanceID,
+ clusterDataProxy: cdProxy,
+ deviceTypes: make(map[string]*voltha.DeviceType),
+ adapterAgents: make(map[string]*AdapterAgent),
+ deviceMgr: deviceMgr,
}
kafkaClient.SubscribeForMetadata(aMgr.updateLastAdapterCommunication)
return aMgr
@@ -153,10 +132,6 @@
logger.Debugw("adapter added successfully", log.Fields{"adapterId": adapter.Id})
}
}
- } else {
- logger.Debug("no-existing-adapter-found")
- // No adapter data. In order to have a proxy setup for that path let's create a fake adapter
- return aMgr.addAdapter(&voltha.Adapter{Id: SentinelAdapterID}, true)
}
// Load the device types
@@ -175,8 +150,8 @@
}
logger.Debug("no-existing-device-type-found")
- // No device types data. In order to have a proxy setup for that path let's create a fake device type
- return aMgr.addDeviceTypes(&voltha.DeviceTypes{Items: []*voltha.DeviceType{{Id: SentinelDevicetypeID, Adapter: SentinelAdapterID}}}, true)
+
+ return nil
}
func (aMgr *AdapterManager) updateLastAdapterCommunication(adapterID string, timestamp int64) {
@@ -192,7 +167,8 @@
func (aMgr *AdapterManager) addAdapter(adapter *voltha.Adapter, saveToDb bool) error {
aMgr.lockAdaptersMap.Lock()
defer aMgr.lockAdaptersMap.Unlock()
- logger.Debugw("adding-adapter", log.Fields{"adapter": adapter})
+ logger.Debugw("adding-adapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
+ "currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint})
if _, exist := aMgr.adapterAgents[adapter.Id]; !exist {
if saveToDb {
// Save the adapter to the KV store - first check if it already exist
@@ -201,10 +177,17 @@
return err
} else if !have {
if err := aMgr.clusterDataProxy.AddWithID(context.Background(), "adapters", adapter.Id, adapter); err != nil {
- logger.Errorw("failed-to-save-adapter-to-cluster-proxy", log.Fields{"error": err})
+ logger.Errorw("failed-to-save-adapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
+ "currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint, "replica": adapter.CurrentReplica, "total": adapter.TotalReplicas})
return err
}
- logger.Debugw("adapter-saved-to-KV-Store", log.Fields{"adapter": adapter})
+ logger.Debugw("adapter-saved-to-KV-Store", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
+ "currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint, "replica": adapter.CurrentReplica, "total": adapter.TotalReplicas})
+ } else {
+ log.Warnw("adding-adapter-already-in-KV-store", log.Fields{
+ "adapterName": adapter.Id,
+ "adapterReplica": adapter.CurrentReplica,
+ })
}
}
clonedAdapter := (proto.Clone(adapter)).(*voltha.Adapter)
@@ -223,6 +206,11 @@
aMgr.lockdDeviceTypeToAdapterMap.Lock()
defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
+ // create an in memory map to fetch the entire voltha.DeviceType from a device.Type string
+ for _, deviceType := range deviceTypes.Items {
+ aMgr.deviceTypes[deviceType.Id] = deviceType
+ }
+
if saveToDb {
// Save the device types to the KV store
for _, deviceType := range deviceTypes.Items {
@@ -240,17 +228,7 @@
}
}
}
- // and save locally
- for _, deviceType := range deviceTypes.Items {
- clonedDType := (proto.Clone(deviceType)).(*voltha.DeviceType)
- if adapterAgent, exist := aMgr.adapterAgents[clonedDType.Adapter]; exist {
- adapterAgent.updateDeviceType(clonedDType)
- } else {
- logger.Debugw("adapter-not-exist", log.Fields{"deviceTypes": deviceTypes, "adapterId": clonedDType.Adapter})
- aMgr.adapterAgents[clonedDType.Adapter] = newAdapterAgent(&voltha.Adapter{Id: clonedDType.Adapter}, deviceTypes)
- }
- aMgr.deviceTypeToAdapterMap[clonedDType.Id] = clonedDType.Adapter
- }
+
return nil
}
@@ -260,9 +238,7 @@
defer aMgr.lockAdaptersMap.RUnlock()
for _, adapterAgent := range aMgr.adapterAgents {
if a := adapterAgent.getAdapter(); a != nil {
- if a.Id != SentinelAdapterID { // don't report the sentinel
- result.Items = append(result.Items, (proto.Clone(a)).(*voltha.Adapter))
- }
+ result.Items = append(result.Items, (proto.Clone(a)).(*voltha.Adapter))
}
}
return result, nil
@@ -278,7 +254,8 @@
}
func (aMgr *AdapterManager) registerAdapter(adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) (*voltha.CoreInstance, error) {
- logger.Debugw("registerAdapter", log.Fields{"adapter": adapter, "deviceTypes": deviceTypes.Items})
+ logger.Debugw("registerAdapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
+ "currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint, "deviceTypes": deviceTypes.Items})
if aMgr.getAdapter(adapter.Id) != nil {
// Already registered - Adapter may have restarted. Trigger the reconcile process for that adapter
@@ -300,17 +277,20 @@
return nil, err
}
- logger.Debugw("adapter-registered", log.Fields{"adapter": adapter.Id})
+ logger.Debugw("adapter-registered", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
+ "currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint})
return &voltha.CoreInstance{InstanceId: aMgr.coreInstanceID}, nil
}
-//getAdapterName returns the name of the device adapter that service this device type
-func (aMgr *AdapterManager) getAdapterName(deviceType string) (string, error) {
+// getAdapterType returns the name of the device adapter that service this device type
+func (aMgr *AdapterManager) getAdapterType(deviceType string) (string, error) {
aMgr.lockdDeviceTypeToAdapterMap.Lock()
defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
- if adapterID, exist := aMgr.deviceTypeToAdapterMap[deviceType]; exist {
- return adapterID, nil
+ for _, adapterAgent := range aMgr.adapterAgents {
+ if deviceType == adapterAgent.adapter.Type {
+ return adapterAgent.adapter.Type, nil
+ }
}
return "", fmt.Errorf("Adapter-not-registered-for-device-type %s", deviceType)
}
@@ -319,16 +299,12 @@
aMgr.lockdDeviceTypeToAdapterMap.Lock()
defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
- deviceTypes := make([]*voltha.DeviceType, 0, len(aMgr.deviceTypeToAdapterMap))
- for deviceTypeID, adapterID := range aMgr.deviceTypeToAdapterMap {
- if adapterAgent, have := aMgr.adapterAgents[adapterID]; have {
- if deviceType := adapterAgent.getDeviceType(deviceTypeID); deviceType != nil {
- if deviceType.Id != SentinelDevicetypeID { // don't report the sentinel
- deviceTypes = append(deviceTypes, deviceType)
- }
- }
- }
+ deviceTypes := make([]*voltha.DeviceType, 0, len(aMgr.deviceTypes))
+
+ for _, deviceType := range aMgr.deviceTypes {
+ deviceTypes = append(deviceTypes, deviceType)
}
+
return deviceTypes
}
@@ -337,10 +313,9 @@
aMgr.lockdDeviceTypeToAdapterMap.Lock()
defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
- if adapterID, exist := aMgr.deviceTypeToAdapterMap[deviceType]; exist {
- if adapterAgent := aMgr.adapterAgents[adapterID]; adapterAgent != nil {
- return adapterAgent.getDeviceType(deviceType)
- }
+ if deviceType, exist := aMgr.deviceTypes[deviceType]; exist {
+ return deviceType
}
+
return nil
}
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index d3907bb..1e18ba4 100755
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -30,14 +30,16 @@
deviceTopicRegistered bool
corePairTopic string
kafkaICProxy kafka.InterContainerProxy
+ endpointManager kafka.EndpointManager
}
// NewAdapterProxy will return adapter proxy instance
-func NewAdapterProxy(kafkaProxy kafka.InterContainerProxy, corePairTopic string) *AdapterProxy {
+func NewAdapterProxy(kafkaProxy kafka.InterContainerProxy, corePairTopic string, endpointManager kafka.EndpointManager) *AdapterProxy {
return &AdapterProxy{
kafkaICProxy: kafkaProxy,
corePairTopic: corePairTopic,
deviceTopicRegistered: false,
+ endpointManager: endpointManager,
}
}
@@ -45,8 +47,14 @@
return kafka.Topic{Name: ap.corePairTopic}
}
-func (ap *AdapterProxy) getAdapterTopic(adapterName string) kafka.Topic {
- return kafka.Topic{Name: adapterName}
+func (ap *AdapterProxy) getAdapterTopic(deviceID string, adapterType string) (*kafka.Topic, error) {
+
+ endpoint, err := ap.endpointManager.GetEndpoint(deviceID, adapterType)
+ if err != nil {
+ return nil, err
+ }
+
+ return &kafka.Topic{Name: string(endpoint)}, nil
}
func (ap *AdapterProxy) sendRPC(ctx context.Context, rpc string, toTopic *kafka.Topic, replyToTopic *kafka.Topic,
@@ -69,167 +77,210 @@
func (ap *AdapterProxy) adoptDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
logger.Debugw("adoptDevice", log.Fields{"device-id": device.Id})
rpc := "adopt_device"
- toTopic := ap.getAdapterTopic(device.Adapter)
+ toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+ if err != nil {
+ return nil, err
+ }
args := []*kafka.KVArg{
{Key: "device", Value: device},
}
replyToTopic := ap.getCoreTopic()
ap.deviceTopicRegistered = true
- return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+ logger.Debugw("adoptDevice-send-request", log.Fields{"device-id": device.Id, "deviceType": device.Type, "serialNumber": device.SerialNumber})
+ return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
// disableDevice invokes disable device rpc
func (ap *AdapterProxy) disableDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
logger.Debugw("disableDevice", log.Fields{"device-id": device.Id})
rpc := "disable_device"
- toTopic := ap.getAdapterTopic(device.Adapter)
+ toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+ if err != nil {
+ return nil, err
+ }
args := []*kafka.KVArg{
{Key: "device", Value: device},
}
replyToTopic := ap.getCoreTopic()
- return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+ return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
// reEnableDevice invokes reenable device rpc
func (ap *AdapterProxy) reEnableDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
logger.Debugw("reEnableDevice", log.Fields{"device-id": device.Id})
rpc := "reenable_device"
- toTopic := ap.getAdapterTopic(device.Adapter)
+ toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+ if err != nil {
+ return nil, err
+ }
args := []*kafka.KVArg{
{Key: "device", Value: device},
}
replyToTopic := ap.getCoreTopic()
- return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+ return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
// rebootDevice invokes reboot device rpc
func (ap *AdapterProxy) rebootDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
logger.Debugw("rebootDevice", log.Fields{"device-id": device.Id})
rpc := "reboot_device"
- toTopic := ap.getAdapterTopic(device.Adapter)
+ toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+ if err != nil {
+ return nil, err
+ }
args := []*kafka.KVArg{
{Key: "device", Value: device},
}
replyToTopic := ap.getCoreTopic()
- return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+ return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
// deleteDevice invokes delete device rpc
func (ap *AdapterProxy) deleteDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
logger.Debugw("deleteDevice", log.Fields{"device-id": device.Id})
rpc := "delete_device"
- toTopic := ap.getAdapterTopic(device.Adapter)
+ toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+ if err != nil {
+ return nil, err
+ }
args := []*kafka.KVArg{
{Key: "device", Value: device},
}
replyToTopic := ap.getCoreTopic()
- return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+ return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
// getOfpDeviceInfo invokes get ofp device info rpc
func (ap *AdapterProxy) getOfpDeviceInfo(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
logger.Debugw("getOfpDeviceInfo", log.Fields{"device-id": device.Id})
rpc := "get_ofp_device_info"
- toTopic := ap.getAdapterTopic(device.Adapter)
+ toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+ if err != nil {
+ return nil, err
+ }
args := []*kafka.KVArg{
{Key: "device", Value: device},
}
replyToTopic := ap.getCoreTopic()
- return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+ return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
// getOfpPortInfo invokes get ofp port info rpc
func (ap *AdapterProxy) getOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (chan *kafka.RpcResponse, error) {
logger.Debugw("getOfpPortInfo", log.Fields{"device-id": device.Id, "port-no": portNo})
- toTopic := ap.getAdapterTopic(device.Adapter)
+ toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+ if err != nil {
+ return nil, err
+ }
args := []*kafka.KVArg{
{Key: "device", Value: device},
{Key: "port_no", Value: &ic.IntType{Val: int64(portNo)}},
}
replyToTopic := ap.getCoreTopic()
- return ap.sendRPC(ctx, "get_ofp_port_info", &toTopic, &replyToTopic, true, device.Id, args...)
+ return ap.sendRPC(ctx, "get_ofp_port_info", toTopic, &replyToTopic, true, device.Id, args...)
}
// reconcileDevice invokes reconcile device rpc
func (ap *AdapterProxy) reconcileDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
logger.Debugw("reconcileDevice", log.Fields{"device-id": device.Id})
rpc := "reconcile_device"
- toTopic := ap.getAdapterTopic(device.Adapter)
+ toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+ if err != nil {
+ return nil, err
+ }
args := []*kafka.KVArg{
{Key: "device", Value: device},
}
replyToTopic := ap.getCoreTopic()
- return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+ return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
// downloadImage invokes download image rpc
func (ap *AdapterProxy) downloadImage(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
logger.Debugw("downloadImage", log.Fields{"device-id": device.Id, "image": download.Name})
rpc := "download_image"
- toTopic := ap.getAdapterTopic(device.Adapter)
+ toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+ if err != nil {
+ return nil, err
+ }
args := []*kafka.KVArg{
{Key: "device", Value: device},
{Key: "request", Value: download},
}
replyToTopic := ap.getCoreTopic()
- return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+ return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
// getImageDownloadStatus invokes get image download status rpc
func (ap *AdapterProxy) getImageDownloadStatus(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
logger.Debugw("getImageDownloadStatus", log.Fields{"device-id": device.Id, "image": download.Name})
rpc := "get_image_download_status"
- toTopic := ap.getAdapterTopic(device.Adapter)
+ toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+ if err != nil {
+ return nil, err
+ }
args := []*kafka.KVArg{
{Key: "device", Value: device},
{Key: "request", Value: download},
}
replyToTopic := ap.getCoreTopic()
- return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+ return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
// cancelImageDownload invokes cancel image download rpc
func (ap *AdapterProxy) cancelImageDownload(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
logger.Debugw("cancelImageDownload", log.Fields{"device-id": device.Id, "image": download.Name})
rpc := "cancel_image_download"
- toTopic := ap.getAdapterTopic(device.Adapter)
+ toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+ if err != nil {
+ return nil, err
+ }
args := []*kafka.KVArg{
{Key: "device", Value: device},
{Key: "request", Value: download},
}
replyToTopic := ap.getCoreTopic()
- return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+ return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
// activateImageUpdate invokes activate image update rpc
func (ap *AdapterProxy) activateImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
logger.Debugw("activateImageUpdate", log.Fields{"device-id": device.Id, "image": download.Name})
rpc := "activate_image_update"
- toTopic := ap.getAdapterTopic(device.Adapter)
+ toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+ if err != nil {
+ return nil, err
+ }
args := []*kafka.KVArg{
{Key: "device", Value: device},
{Key: "request", Value: download},
}
replyToTopic := ap.getCoreTopic()
- return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+ return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
// revertImageUpdate invokes revert image update rpc
func (ap *AdapterProxy) revertImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
logger.Debugw("revertImageUpdate", log.Fields{"device-id": device.Id, "image": download.Name})
rpc := "revert_image_update"
- toTopic := ap.getAdapterTopic(device.Adapter)
+ toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+ if err != nil {
+ return nil, err
+ }
args := []*kafka.KVArg{
{Key: "device", Value: device},
{Key: "request", Value: download},
}
replyToTopic := ap.getCoreTopic()
- return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+ return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
func (ap *AdapterProxy) packetOut(ctx context.Context, deviceType string, deviceID string, outPort uint32, packet *openflow_13.OfpPacketOut) (chan *kafka.RpcResponse, error) {
logger.Debugw("packetOut", log.Fields{"device-id": deviceID, "device-type": deviceType, "out-port": outPort})
- toTopic := ap.getAdapterTopic(deviceType)
+ toTopic, err := ap.getAdapterTopic(deviceID, deviceType)
+ if err != nil {
+ return nil, err
+ }
rpc := "receive_packet_out"
args := []*kafka.KVArg{
{Key: "deviceId", Value: &ic.StrType{Val: deviceID}},
@@ -237,13 +288,16 @@
{Key: "packet", Value: packet},
}
replyToTopic := ap.getCoreTopic()
- return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, deviceID, args...)
+ return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, deviceID, args...)
}
// updateFlowsBulk invokes update flows bulk rpc
func (ap *AdapterProxy) updateFlowsBulk(ctx context.Context, device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata) (chan *kafka.RpcResponse, error) {
logger.Debugw("updateFlowsBulk", log.Fields{"device-id": device.Id, "flow-count": len(flows.Items), "group-count": len(groups.Items), "flow-metadata": flowMetadata})
- toTopic := ap.getAdapterTopic(device.Adapter)
+ toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+ if err != nil {
+ return nil, err
+ }
rpc := "update_flows_bulk"
args := []*kafka.KVArg{
{Key: "device", Value: device},
@@ -252,7 +306,7 @@
{Key: "flow_metadata", Value: flowMetadata},
}
replyToTopic := ap.getCoreTopic()
- return ap.sendRPC(context.TODO(), rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+ return ap.sendRPC(context.TODO(), rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
// updateFlowsIncremental invokes update flows incremental rpc
@@ -266,7 +320,10 @@
"group-to-delete-count": len(groupChanges.ToRemove.Items),
"group-to-update-count": len(groupChanges.ToUpdate.Items),
})
- toTopic := ap.getAdapterTopic(device.Adapter)
+ toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+ if err != nil {
+ return nil, err
+ }
rpc := "update_flows_incrementally"
args := []*kafka.KVArg{
{Key: "device", Value: device},
@@ -275,83 +332,101 @@
{Key: "flow_metadata", Value: flowMetadata},
}
replyToTopic := ap.getCoreTopic()
- return ap.sendRPC(context.TODO(), rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+ return ap.sendRPC(context.TODO(), rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
// updatePmConfigs invokes update pm configs rpc
func (ap *AdapterProxy) updatePmConfigs(ctx context.Context, device *voltha.Device, pmConfigs *voltha.PmConfigs) (chan *kafka.RpcResponse, error) {
logger.Debugw("updatePmConfigs", log.Fields{"device-id": device.Id, "pm-configs-id": pmConfigs.Id})
- toTopic := ap.getAdapterTopic(device.Adapter)
+ toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+ if err != nil {
+ return nil, err
+ }
rpc := "Update_pm_config"
args := []*kafka.KVArg{
{Key: "device", Value: device},
{Key: "pm_configs", Value: pmConfigs},
}
replyToTopic := ap.getCoreTopic()
- return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+ return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
// simulateAlarm invokes simulate alarm rpc
func (ap *AdapterProxy) simulateAlarm(ctx context.Context, device *voltha.Device, simulateReq *voltha.SimulateAlarmRequest) (chan *kafka.RpcResponse, error) {
logger.Debugw("simulateAlarm", log.Fields{"device-id": device.Id, "simulate-req-id": simulateReq.Id})
rpc := "simulate_alarm"
- toTopic := ap.getAdapterTopic(device.Adapter)
+ toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+ if err != nil {
+ return nil, err
+ }
args := []*kafka.KVArg{
{Key: "device", Value: device},
{Key: "request", Value: simulateReq},
}
replyToTopic := ap.getCoreTopic()
ap.deviceTopicRegistered = true
- return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+ return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
func (ap *AdapterProxy) disablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) (chan *kafka.RpcResponse, error) {
logger.Debugw("disablePort", log.Fields{"device-id": device.Id, "port-no": port.PortNo})
rpc := "disable_port"
- toTopic := ap.getAdapterTopic(device.Adapter)
+ toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+ if err != nil {
+ return nil, err
+ }
args := []*kafka.KVArg{
{Key: "deviceId", Value: &ic.StrType{Val: device.Id}},
{Key: "port", Value: port},
}
replyToTopic := ap.getCoreTopic()
- return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+ return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
func (ap *AdapterProxy) enablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) (chan *kafka.RpcResponse, error) {
logger.Debugw("enablePort", log.Fields{"device-id": device.Id, "port-no": port.PortNo})
rpc := "enable_port"
- toTopic := ap.getAdapterTopic(device.Adapter)
+ toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+ if err != nil {
+ return nil, err
+ }
args := []*kafka.KVArg{
{Key: "deviceId", Value: &ic.StrType{Val: device.Id}},
{Key: "port", Value: port},
}
replyToTopic := ap.getCoreTopic()
- return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+ return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
// childDeviceLost invokes child device_lost rpc
-func (ap *AdapterProxy) childDeviceLost(ctx context.Context, deviceType string, pDeviceID string, pPortNo uint32, onuID uint32) (chan *kafka.RpcResponse, error) {
- logger.Debugw("childDeviceLost", log.Fields{"parent-device-id": pDeviceID, "parent-port-no": pPortNo, "onu-id": onuID})
+func (ap *AdapterProxy) childDeviceLost(ctx context.Context, deviceType string, deviceID string, pPortNo uint32, onuID uint32) (chan *kafka.RpcResponse, error) {
+ logger.Debugw("childDeviceLost", log.Fields{"device-id": deviceID, "parent-port-no": pPortNo, "onu-id": onuID})
rpc := "child_device_lost"
- toTopic := ap.getAdapterTopic(deviceType)
+ toTopic, err := ap.getAdapterTopic(deviceID, deviceType)
+ if err != nil {
+ return nil, err
+ }
args := []*kafka.KVArg{
- {Key: "pDeviceId", Value: &ic.StrType{Val: pDeviceID}},
+ {Key: "pDeviceId", Value: &ic.StrType{Val: deviceID}},
{Key: "pPortNo", Value: &ic.IntType{Val: int64(pPortNo)}},
{Key: "onuID", Value: &ic.IntType{Val: int64(onuID)}},
}
replyToTopic := ap.getCoreTopic()
- return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, pDeviceID, args...)
+ return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, deviceID, args...)
}
func (ap *AdapterProxy) startOmciTest(ctx context.Context, device *voltha.Device, omcitestrequest *voltha.OmciTestRequest) (chan *kafka.RpcResponse, error) {
logger.Debugw("Omci_test_Request_adapter_proxy", log.Fields{"device": device, "omciTestRequest": omcitestrequest})
rpc := "start_omci_test"
- toTopic := ap.getAdapterTopic(device.Adapter)
+ toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+ if err != nil {
+ return nil, err
+ }
// Use a device specific topic as we are the only core handling requests for this device
replyToTopic := ap.getCoreTopic()
// TODO: Perhaps this should have used omcitestrequest.uuid as the second argument rather
// than including the whole request, which is (deviceid, uuid)
- return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id,
+ return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id,
&kafka.KVArg{Key: "device", Value: device},
&kafka.KVArg{Key: "omcitestrequest", Value: omcitestrequest})
}
diff --git a/rw_core/core/adapter_proxy_test.go b/rw_core/core/adapter_proxy_test.go
index 8784ff2..718cc30 100755
--- a/rw_core/core/adapter_proxy_test.go
+++ b/rw_core/core/adapter_proxy_test.go
@@ -24,7 +24,7 @@
com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
- lm "github.com/opencord/voltha-lib-go/v3/pkg/mocks"
+ mock_kafka "github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka"
ic "github.com/opencord/voltha-protos/v3/go/inter_container"
of "github.com/opencord/voltha-protos/v3/go/openflow_13"
"github.com/opencord/voltha-protos/v3/go/voltha"
@@ -60,7 +60,7 @@
var err error
// Create the KV client
- kc = lm.NewKafkaClient()
+ kc = mock_kafka.NewKafkaClient()
// Setup core inter-container proxy and core request handler
coreKafkaICProxy = kafka.NewInterContainerProxy(
@@ -98,7 +98,7 @@
}
func TestCreateAdapterProxy(t *testing.T) {
- ap := NewAdapterProxy(coreKafkaICProxy, coreName)
+ ap := NewAdapterProxy(coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager())
assert.NotNil(t, ap)
}
@@ -119,7 +119,7 @@
func testSimpleRequests(t *testing.T) {
type simpleRequest func(context.Context, *voltha.Device) (chan *kafka.RpcResponse, error)
- ap := NewAdapterProxy(coreKafkaICProxy, coreName)
+ ap := NewAdapterProxy(coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager())
simpleRequests := []simpleRequest{
ap.adoptDevice,
ap.disableDevice,
@@ -162,7 +162,7 @@
}
func testGetSwitchCapabilityFromAdapter(t *testing.T) {
- ap := NewAdapterProxy(coreKafkaICProxy, coreName)
+ ap := NewAdapterProxy(coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager())
d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
@@ -179,7 +179,7 @@
}
func testGetPortInfoFromAdapter(t *testing.T) {
- ap := NewAdapterProxy(coreKafkaICProxy, coreName)
+ ap := NewAdapterProxy(coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager())
d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
@@ -197,7 +197,7 @@
}
func testPacketOut(t *testing.T) {
- ap := NewAdapterProxy(coreKafkaICProxy, coreName)
+ ap := NewAdapterProxy(coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager())
d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
outPort := uint32(1)
packet, err := getRandomBytes(50)
@@ -211,7 +211,7 @@
}
func testFlowUpdates(t *testing.T) {
- ap := NewAdapterProxy(coreKafkaICProxy, coreName)
+ ap := NewAdapterProxy(coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager())
d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
_, err := ap.updateFlowsBulk(context.Background(), d, &voltha.Flows{}, &voltha.FlowGroups{}, &voltha.FlowMetadata{})
assert.Nil(t, err)
@@ -226,7 +226,7 @@
}
func testPmUpdates(t *testing.T) {
- ap := NewAdapterProxy(coreKafkaICProxy, coreName)
+ ap := NewAdapterProxy(coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager())
d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
@@ -236,7 +236,7 @@
assert.Nil(t, err)
}
-func TestSuite(t *testing.T) {
+func TestSuiteAdapterProxy(t *testing.T) {
//1. Test the simple requests first
testSimpleRequests(t)
diff --git a/rw_core/core/common_test.go b/rw_core/core/common_test.go
index 9771619..7591c18 100644
--- a/rw_core/core/common_test.go
+++ b/rw_core/core/common_test.go
@@ -30,7 +30,7 @@
"github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
- lm "github.com/opencord/voltha-lib-go/v3/pkg/mocks"
+ mock_etcd "github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd"
"github.com/opencord/voltha-protos/v3/go/voltha"
"github.com/phayes/freeport"
"google.golang.org/grpc/codes"
@@ -75,7 +75,7 @@
}
//startEmbeddedEtcdServer creates and starts an Embedded etcd server locally.
-func startEmbeddedEtcdServer(configName, storageDir, logLevel string) (*lm.EtcdServer, int, error) {
+func startEmbeddedEtcdServer(configName, storageDir, logLevel string) (*mock_etcd.EtcdServer, int, error) {
kvClientPort, err := freeport.GetFreePort()
if err != nil {
return nil, 0, err
@@ -84,14 +84,14 @@
if err != nil {
return nil, 0, err
}
- etcdServer := lm.StartEtcdServer(lm.MKConfig(configName, kvClientPort, peerPort, storageDir, logLevel))
+ etcdServer := mock_etcd.StartEtcdServer(mock_etcd.MKConfig(configName, kvClientPort, peerPort, storageDir, logLevel))
if etcdServer == nil {
return nil, 0, status.Error(codes.Internal, "Embedded server failed to start")
}
return etcdServer, kvClientPort, nil
}
-func stopEmbeddedEtcdServer(server *lm.EtcdServer) {
+func stopEmbeddedEtcdServer(server *mock_etcd.EtcdServer) {
if server != nil {
server.Stop()
}
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 69391fb..dccd271 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -254,15 +254,16 @@
// First figure out which adapter will handle this device type. We do it at this stage as allow devices to be
// pre-provisioned with the required adapter not registered. At this stage, since we need to communicate
// with the adapter then we need to know the adapter that will handle this request
- adapterName, err := agent.adapterMgr.getAdapterName(cloned.Type)
+ adapterName, err := agent.adapterMgr.getAdapterType(cloned.Type)
if err != nil {
return err
}
cloned.Adapter = adapterName
if cloned.AdminState == voltha.AdminState_ENABLED {
- logger.Debugw("device-already-enabled", log.Fields{"device-id": agent.deviceID})
- return nil
+ logger.Warnw("device-already-enabled", log.Fields{"device-id": agent.deviceID})
+ err = status.Error(codes.FailedPrecondition, fmt.Sprintf("cannot-enable-an-already-enabled-device: %s ", cloned.Id))
+ return err
}
if cloned.AdminState == voltha.AdminState_DELETED {
@@ -1665,14 +1666,18 @@
}
device := agent.getDeviceWithoutLock()
- adapterName, err := agent.adapterMgr.getAdapterName(device.Type)
- if err != nil {
- agent.requestQueue.RequestComplete()
- return nil, err
+
+ if device.Adapter == "" {
+ adapterName, err := agent.adapterMgr.getAdapterType(device.Type)
+ if err != nil {
+ agent.requestQueue.RequestComplete()
+ return nil, err
+ }
+
+ device.Adapter = adapterName
}
// Send request to the adapter
- device.Adapter = adapterName
ch, err := agent.adapterProxy.startOmciTest(ctx, device, omcitestrequest)
agent.requestQueue.RequestComplete()
if err != nil {
diff --git a/rw_core/core/device_agent_test.go b/rw_core/core/device_agent_test.go
index 4f1506f..f8fb810 100755
--- a/rw_core/core/device_agent_test.go
+++ b/rw_core/core/device_agent_test.go
@@ -21,7 +21,8 @@
"github.com/opencord/voltha-go/rw_core/config"
com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
- lm "github.com/opencord/voltha-lib-go/v3/pkg/mocks"
+ mock_etcd "github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd"
+ mock_kafka "github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka"
ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
"github.com/opencord/voltha-protos/v3/go/voltha"
"github.com/phayes/freeport"
@@ -35,7 +36,7 @@
)
type DATest struct {
- etcdServer *lm.EtcdServer
+ etcdServer *mock_etcd.EtcdServer
core *Core
kClient kafka.Client
kvClientPort int
@@ -57,7 +58,7 @@
logger.Fatal(err)
}
// Create the kafka client
- test.kClient = lm.NewKafkaClient()
+ test.kClient = mock_kafka.NewKafkaClient()
test.oltAdapterName = "olt_adapter_mock"
test.onuAdapterName = "onu_adapter_mock"
test.coreInstanceID = "rw-da-test"
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index afe84e8..18593d8 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -56,12 +56,15 @@
}
func newDeviceManager(core *Core) *DeviceManager {
+
+ endpointManager := kafka.NewEndpointManager(&core.backend)
+
var deviceMgr DeviceManager
deviceMgr.core = core
deviceMgr.exitChannel = make(chan int, 1)
deviceMgr.rootDevices = make(map[string]bool)
deviceMgr.kafkaICProxy = core.kmp
- deviceMgr.adapterProxy = NewAdapterProxy(core.kmp, core.config.CorePairTopic)
+ deviceMgr.adapterProxy = NewAdapterProxy(core.kmp, core.config.CorePairTopic, endpointManager)
deviceMgr.coreInstanceID = core.instanceID
deviceMgr.clusterDataProxy = core.clusterDataProxy
deviceMgr.adapterMgr = core.adapterMgr
@@ -186,7 +189,6 @@
} else {
res = status.Errorf(codes.NotFound, "%s", id.Id)
}
-
sendResponse(ctx, ch, res)
}
@@ -601,7 +603,8 @@
// adapterRestarted is invoked whenever an adapter is restarted
func (dMgr *DeviceManager) adapterRestarted(ctx context.Context, adapter *voltha.Adapter) error {
- logger.Debugw("adapter-restarted", log.Fields{"adapter": adapter.Id})
+ logger.Debugw("adapter-restarted", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
+ "currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint})
// Let's reconcile the device managed by this Core only
if len(dMgr.rootDevices) == 0 {
diff --git a/rw_core/core/grpc_nbi_api_handler_test.go b/rw_core/core/grpc_nbi_api_handler_test.go
index 0490c5f..e54c14c 100755
--- a/rw_core/core/grpc_nbi_api_handler_test.go
+++ b/rw_core/core/grpc_nbi_api_handler_test.go
@@ -34,7 +34,8 @@
cm "github.com/opencord/voltha-go/rw_core/mocks"
"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
- lm "github.com/opencord/voltha-lib-go/v3/pkg/mocks"
+ mock_etcd "github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd"
+ mock_kafka "github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka"
"github.com/opencord/voltha-lib-go/v3/pkg/version"
ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
"github.com/opencord/voltha-protos/v3/go/voltha"
@@ -45,7 +46,7 @@
)
type NBTest struct {
- etcdServer *lm.EtcdServer
+ etcdServer *mock_etcd.EtcdServer
core *Core
kClient kafka.Client
kvClientPort int
@@ -69,7 +70,7 @@
logger.Fatal(err)
}
// Create the kafka client
- test.kClient = lm.NewKafkaClient()
+ test.kClient = mock_kafka.NewKafkaClient()
test.oltAdapterName = "olt_adapter_mock"
test.onuAdapterName = "onu_adapter_mock"
test.coreInstanceID = "rw-nbi-test"
@@ -114,9 +115,13 @@
// Register the adapter
registrationData := &voltha.Adapter{
- Id: nb.oltAdapterName,
- Vendor: "Voltha-olt",
- Version: version.VersionInfo.Version,
+ Id: nb.oltAdapterName,
+ Vendor: "Voltha-olt",
+ Version: version.VersionInfo.Version,
+ Type: nb.oltAdapterName,
+ CurrentReplica: 1,
+ TotalReplicas: 1,
+ Endpoint: nb.oltAdapterName,
}
types := []*voltha.DeviceType{{Id: nb.oltAdapterName, Adapter: nb.oltAdapterName, AcceptsAddRemoveFlowUpdates: true}}
deviceTypes := &voltha.DeviceTypes{Items: types}
@@ -134,9 +139,13 @@
// Register the adapter
registrationData = &voltha.Adapter{
- Id: nb.onuAdapterName,
- Vendor: "Voltha-onu",
- Version: version.VersionInfo.Version,
+ Id: nb.onuAdapterName,
+ Vendor: "Voltha-onu",
+ Version: version.VersionInfo.Version,
+ Type: nb.onuAdapterName,
+ CurrentReplica: 1,
+ TotalReplicas: 1,
+ Endpoint: nb.onuAdapterName,
}
types = []*voltha.DeviceType{{Id: nb.onuAdapterName, Adapter: nb.onuAdapterName, AcceptsAddRemoveFlowUpdates: true}}
deviceTypes = &voltha.DeviceTypes{Items: types}
@@ -1108,7 +1117,7 @@
assert.Nil(t, err)
}
-func TestSuite1(t *testing.T) {
+func TestSuiteNbiApiHandler(t *testing.T) {
f, err := os.Create("profile.cpu")
if err != nil {
logger.Fatalf("could not create CPU profile: %v\n ", err)
diff --git a/rw_core/core/logical_device_agent_test.go b/rw_core/core/logical_device_agent_test.go
index 175fe06..70d809a 100644
--- a/rw_core/core/logical_device_agent_test.go
+++ b/rw_core/core/logical_device_agent_test.go
@@ -27,7 +27,8 @@
com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
- lm "github.com/opencord/voltha-lib-go/v3/pkg/mocks"
+ mock_etcd "github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd"
+ mock_kafka "github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka"
ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
"github.com/opencord/voltha-protos/v3/go/voltha"
"github.com/phayes/freeport"
@@ -357,7 +358,7 @@
}
type LDATest struct {
- etcdServer *lm.EtcdServer
+ etcdServer *mock_etcd.EtcdServer
core *Core
kClient kafka.Client
kvClientPort int
@@ -380,7 +381,7 @@
logger.Fatal(err)
}
// Create the kafka client
- test.kClient = lm.NewKafkaClient()
+ test.kClient = mock_kafka.NewKafkaClient()
test.oltAdapterName = "olt_adapter_mock"
test.onuAdapterName = "onu_adapter_mock"
test.coreInstanceID = "rw-da-test"
diff --git a/vendor/github.com/buraksezer/consistent/.gitignore b/vendor/github.com/buraksezer/consistent/.gitignore
new file mode 100644
index 0000000..a1338d6
--- /dev/null
+++ b/vendor/github.com/buraksezer/consistent/.gitignore
@@ -0,0 +1,14 @@
+# Binaries for programs and plugins
+*.exe
+*.dll
+*.so
+*.dylib
+
+# Test binary, build with `go test -c`
+*.test
+
+# Output of the go coverage tool, specifically when used with LiteIDE
+*.out
+
+# Project-local glide cache, RE: https://github.com/Masterminds/glide/issues/736
+.glide/
diff --git a/vendor/github.com/buraksezer/consistent/.travis.yml b/vendor/github.com/buraksezer/consistent/.travis.yml
new file mode 100644
index 0000000..4f2ee4d
--- /dev/null
+++ b/vendor/github.com/buraksezer/consistent/.travis.yml
@@ -0,0 +1 @@
+language: go
diff --git a/vendor/github.com/buraksezer/consistent/LICENSE b/vendor/github.com/buraksezer/consistent/LICENSE
new file mode 100644
index 0000000..e470334
--- /dev/null
+++ b/vendor/github.com/buraksezer/consistent/LICENSE
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2018 Burak Sezer
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/vendor/github.com/buraksezer/consistent/README.md b/vendor/github.com/buraksezer/consistent/README.md
new file mode 100644
index 0000000..bde53d1
--- /dev/null
+++ b/vendor/github.com/buraksezer/consistent/README.md
@@ -0,0 +1,235 @@
+consistent
+==========
+[![GoDoc](http://img.shields.io/badge/godoc-reference-blue.svg?style=flat)](https://godoc.org/github.com/buraksezer/consistent) [![Build Status](https://travis-ci.org/buraksezer/consistent.svg?branch=master)](https://travis-ci.org/buraksezer/consistent) [![Coverage](http://gocover.io/_badge/github.com/buraksezer/consistent)](http://gocover.io/github.com/buraksezer/consistent) [![Go Report Card](https://goreportcard.com/badge/github.com/buraksezer/consistent)](https://goreportcard.com/report/github.com/buraksezer/consistent) [![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT) [![Mentioned in Awesome Go](https://awesome.re/mentioned-badge.svg)](https://github.com/avelino/awesome-go)
+
+
+This library provides a consistent hashing function which simultaneously achieves both uniformity and consistency.
+
+For detailed information about the concept, you should take a look at the following resources:
+
+* [Consistent Hashing with Bounded Loads on Google Research Blog](https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html)
+* [Improving load balancing with a new consistent-hashing algorithm on Vimeo Engineering Blog](https://medium.com/vimeo-engineering-blog/improving-load-balancing-with-a-new-consistent-hashing-algorithm-9f1bd75709ed)
+* [Consistent Hashing with Bounded Loads paper on arXiv](https://arxiv.org/abs/1608.01350)
+
+Table of Content
+----------------
+
+- [Overview](#overview)
+- [Install](#install)
+- [Configuration](#configuration)
+- [Usage](#usage)
+- [Benchmarks](#benchmarks)
+- [Examples](#examples)
+
+Overview
+--------
+
+In this package's context, the keys are distributed among partitions and partitions are distributed among members as well.
+
+When you create a new consistent instance or call `Add/Remove`:
+
+* The member's name is hashed and inserted into the hash ring,
+* Average load is calculated by the algorithm defined in the paper,
+* Partitions are distributed among members by hashing partition IDs and none of them exceed the average load.
+
+Average load cannot be exceeded. So if all members are loaded at the maximum while trying to add a new member, it panics.
+
+When you want to locate a key by calling `LocateKey`:
+
+* The key(byte slice) is hashed,
+* The result of the hash is mod by the number of partitions,
+* The result of this modulo - `MOD(hash result, partition count)` - is the partition in which the key will be located,
+* Owner of the partition is already determined before calling `LocateKey`. So it returns the partition owner immediately.
+
+No memory is allocated by `consistent` except hashing when you want to locate a key.
+
+Note that the number of partitions cannot be changed after creation.
+
+Install
+-------
+
+With a correctly configured Go environment:
+
+```
+go get github.com/buraksezer/consistent
+```
+
+You will find some useful usage samples in [examples](https://github.com/buraksezer/consistent/tree/master/_examples) folder.
+
+Configuration
+-------------
+
+```go
+type Config struct {
+ // Hasher is responsible for generating unsigned, 64 bit hash of provided byte slice.
+ Hasher Hasher
+
+ // Keys are distributed among partitions. Prime numbers are good to
+ // distribute keys uniformly. Select a big PartitionCount if you have
+ // too many keys.
+ PartitionCount int
+
+ // Members are replicated on consistent hash ring. This number controls
+ // the number each member is replicated on the ring.
+ ReplicationFactor int
+
+ // Load is used to calculate average load. See the code, the paper and Google's
+ // blog post to learn about it.
+ Load float64
+}
+```
+
+Any hash algorithm can be used as hasher which implements Hasher interface. Please take a look at the *Sample* section for an example.
+
+Usage
+-----
+
+`LocateKey` function finds a member in the cluster for your key:
+```go
+// With a properly configured and initialized consistent instance
+key := []byte("my-key")
+member := c.LocateKey(key)
+```
+It returns a thread-safe copy of the member you added before.
+
+The second most frequently used function is `GetClosestN`.
+
+```go
+// With a properly configured and initialized consistent instance
+
+key := []byte("my-key")
+members, err := c.GetClosestN(key, 2)
+```
+
+This may be useful to find backup nodes to store your key.
+
+Benchmarks
+----------
+On an early 2015 Macbook:
+
+```
+BenchmarkAddRemove-4 100000 22006 ns/op
+BenchmarkLocateKey-4 5000000 252 ns/op
+BenchmarkGetClosestN-4 500000 2974 ns/op
+```
+
+Examples
+--------
+
+The most basic use of consistent package should be like this. For detailed list of functions, [visit godoc.org.](https://godoc.org/github.com/buraksezer/consistent)
+More sample code can be found under [_examples](https://github.com/buraksezer/consistent/tree/master/_examples).
+
+```go
+package main
+
+import (
+ "fmt"
+
+ "github.com/buraksezer/consistent"
+ "github.com/cespare/xxhash"
+)
+
+// In your code, you probably have a custom data type
+// for your cluster members. Just add a String function to implement
+// consistent.Member interface.
+type myMember string
+
+func (m myMember) String() string {
+ return string(m)
+}
+
+// consistent package doesn't provide a default hashing function.
+// You should provide a proper one to distribute keys/members uniformly.
+type hasher struct{}
+
+func (h hasher) Sum64(data []byte) uint64 {
+ // you should use a proper hash function for uniformity.
+ return xxhash.Sum64(data)
+}
+
+func main() {
+ // Create a new consistent instance
+ cfg := consistent.Config{
+ PartitionCount: 7,
+ ReplicationFactor: 20,
+ Load: 1.25,
+ Hasher: hasher{},
+ }
+ c := consistent.New(nil, cfg)
+
+ // Add some members to the consistent hash table.
+ // Add function calculates average load and distributes partitions over members
+ node1 := myMember("node1.olric.com")
+ c.Add(node1)
+
+ node2 := myMember("node2.olric.com")
+ c.Add(node2)
+
+ key := []byte("my-key")
+ // calculates partition id for the given key
+ // partID := hash(key) % partitionCount
+ // the partitions are already distributed among members by Add function.
+ owner := c.LocateKey(key)
+ fmt.Println(owner.String())
+ // Prints node2.olric.com
+}
+```
+
+Another useful example is `_examples/relocation_percentage.go`. It creates a `consistent` object with 8 members and distributes partitions among them. Then adds 9th member,
+here is the result with a proper configuration and hash function:
+
+```
+bloom:consistent burak$ go run _examples/relocation_percentage.go
+partID: 218 moved to node2.olric from node0.olric
+partID: 173 moved to node9.olric from node3.olric
+partID: 225 moved to node7.olric from node0.olric
+partID: 85 moved to node9.olric from node7.olric
+partID: 220 moved to node5.olric from node0.olric
+partID: 33 moved to node9.olric from node5.olric
+partID: 254 moved to node9.olric from node4.olric
+partID: 71 moved to node9.olric from node3.olric
+partID: 236 moved to node9.olric from node2.olric
+partID: 118 moved to node9.olric from node3.olric
+partID: 233 moved to node3.olric from node0.olric
+partID: 50 moved to node9.olric from node4.olric
+partID: 252 moved to node9.olric from node2.olric
+partID: 121 moved to node9.olric from node2.olric
+partID: 259 moved to node9.olric from node4.olric
+partID: 92 moved to node9.olric from node7.olric
+partID: 152 moved to node9.olric from node3.olric
+partID: 105 moved to node9.olric from node2.olric
+
+6% of the partitions are relocated
+```
+
+Moved partition count is highly dependent on your configuration and quailty of hash function. You should modify the configuration to find an optimum set of configurations
+for your system.
+
+`_examples/load_distribution.go` is also useful to understand load distribution. It creates a `consistent` object with 8 members and locates 1M key. It also calculates average
+load which cannot be exceeded by any member. Here is the result:
+
+```
+Maximum key count for a member should be around this: 147602
+member: node2.olric, key count: 100362
+member: node5.olric, key count: 99448
+member: node0.olric, key count: 147735
+member: node3.olric, key count: 103455
+member: node6.olric, key count: 147069
+member: node1.olric, key count: 121566
+member: node4.olric, key count: 147932
+member: node7.olric, key count: 132433
+```
+
+Average load can be calculated by using the following formula:
+
+```
+load := (consistent.AverageLoad() * float64(keyCount)) / float64(config.PartitionCount)
+```
+
+Contributions
+-------------
+Please don't hesitate to fork the project and send a pull request or just e-mail me to ask questions and share ideas.
+
+License
+-------
+MIT License, - see LICENSE for more details.
diff --git a/vendor/github.com/buraksezer/consistent/consistent.go b/vendor/github.com/buraksezer/consistent/consistent.go
new file mode 100644
index 0000000..a1446d6
--- /dev/null
+++ b/vendor/github.com/buraksezer/consistent/consistent.go
@@ -0,0 +1,362 @@
+// Copyright (c) 2018 Burak Sezer
+// All rights reserved.
+//
+// This code is licensed under the MIT License.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files(the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions :
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+// Package consistent provides a consistent hashing function with bounded loads.
+// For more information about the underlying algorithm, please take a look at
+// https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html
+//
+// Example Use:
+// cfg := consistent.Config{
+// PartitionCount: 71,
+// ReplicationFactor: 20,
+// Load: 1.25,
+// Hasher: hasher{},
+// }
+//
+// // Create a new consistent object
+// // You may call this with a list of members
+// // instead of adding them one by one.
+// c := consistent.New(members, cfg)
+//
+// // myMember struct just needs to implement a String method.
+// // New/Add/Remove distributes partitions among members using the algorithm
+// // defined on Google Research Blog.
+// c.Add(myMember)
+//
+// key := []byte("my-key")
+// // LocateKey hashes the key and calculates partition ID with
+// // this modulo operation: MOD(hash result, partition count)
+// // The owner of the partition is already calculated by New/Add/Remove.
+// // LocateKey just returns the member which's responsible for the key.
+// member := c.LocateKey(key)
+//
+package consistent
+
+import (
+ "encoding/binary"
+ "errors"
+ "fmt"
+ "math"
+ "sort"
+ "sync"
+)
+
+var (
+ //ErrInsufficientMemberCount represents an error which means there are not enough members to complete the task.
+ ErrInsufficientMemberCount = errors.New("insufficient member count")
+
+ // ErrMemberNotFound represents an error which means requested member could not be found in consistent hash ring.
+ ErrMemberNotFound = errors.New("member could not be found in ring")
+)
+
+// Hasher is responsible for generating unsigned, 64 bit hash of provided byte slice.
+// Hasher should minimize collisions (generating same hash for different byte slice)
+// and while performance is also important fast functions are preferable (i.e.
+// you can use FarmHash family).
+type Hasher interface {
+ Sum64([]byte) uint64
+}
+
+// Member interface represents a member in consistent hash ring.
+type Member interface {
+ String() string
+}
+
+// Config represents a structure to control consistent package.
+type Config struct {
+ // Hasher is responsible for generating unsigned, 64 bit hash of provided byte slice.
+ Hasher Hasher
+
+ // Keys are distributed among partitions. Prime numbers are good to
+ // distribute keys uniformly. Select a big PartitionCount if you have
+ // too many keys.
+ PartitionCount int
+
+ // Members are replicated on consistent hash ring. This number means that a member
+ // how many times replicated on the ring.
+ ReplicationFactor int
+
+ // Load is used to calculate average load. See the code, the paper and Google's blog post to learn about it.
+ Load float64
+}
+
+// Consistent holds the information about the members of the consistent hash circle.
+type Consistent struct {
+ mu sync.RWMutex
+
+ config Config
+ hasher Hasher
+ sortedSet []uint64
+ partitionCount uint64
+ loads map[string]float64
+ members map[string]*Member
+ partitions map[int]*Member
+ ring map[uint64]*Member
+}
+
+// New creates and returns a new Consistent object.
+func New(members []Member, config Config) *Consistent {
+ c := &Consistent{
+ config: config,
+ members: make(map[string]*Member),
+ partitionCount: uint64(config.PartitionCount),
+ ring: make(map[uint64]*Member),
+ }
+ if config.Hasher == nil {
+ panic("Hasher cannot be nil")
+ }
+ // TODO: Check configuration here
+ c.hasher = config.Hasher
+ for _, member := range members {
+ c.add(member)
+ }
+ if members != nil {
+ c.distributePartitions()
+ }
+ return c
+}
+
+// GetMembers returns a thread-safe copy of members.
+func (c *Consistent) GetMembers() []Member {
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+
+ // Create a thread-safe copy of member list.
+ members := make([]Member, 0, len(c.members))
+ for _, member := range c.members {
+ members = append(members, *member)
+ }
+ return members
+}
+
+// AverageLoad exposes the current average load.
+func (c *Consistent) AverageLoad() float64 {
+ avgLoad := float64(c.partitionCount/uint64(len(c.members))) * c.config.Load
+ return math.Ceil(avgLoad)
+}
+
+func (c *Consistent) distributeWithLoad(partID, idx int, partitions map[int]*Member, loads map[string]float64) {
+ avgLoad := c.AverageLoad()
+ var count int
+ for {
+ count++
+ if count >= len(c.sortedSet) {
+ // User needs to decrease partition count, increase member count or increase load factor.
+ panic("not enough room to distribute partitions")
+ }
+ i := c.sortedSet[idx]
+ member := *c.ring[i]
+ load := loads[member.String()]
+ if load+1 <= avgLoad {
+ partitions[partID] = &member
+ loads[member.String()]++
+ return
+ }
+ idx++
+ if idx >= len(c.sortedSet) {
+ idx = 0
+ }
+ }
+}
+
+func (c *Consistent) distributePartitions() {
+ loads := make(map[string]float64)
+ partitions := make(map[int]*Member)
+
+ bs := make([]byte, 8)
+ for partID := uint64(0); partID < c.partitionCount; partID++ {
+ binary.LittleEndian.PutUint64(bs, partID)
+ key := c.hasher.Sum64(bs)
+ idx := sort.Search(len(c.sortedSet), func(i int) bool {
+ return c.sortedSet[i] >= key
+ })
+ if idx >= len(c.sortedSet) {
+ idx = 0
+ }
+ c.distributeWithLoad(int(partID), idx, partitions, loads)
+ }
+ c.partitions = partitions
+ c.loads = loads
+}
+
+func (c *Consistent) add(member Member) {
+ for i := 0; i < c.config.ReplicationFactor; i++ {
+ key := []byte(fmt.Sprintf("%s%d", member.String(), i))
+ h := c.hasher.Sum64(key)
+ c.ring[h] = &member
+ c.sortedSet = append(c.sortedSet, h)
+ }
+ // sort hashes ascendingly
+ sort.Slice(c.sortedSet, func(i int, j int) bool {
+ return c.sortedSet[i] < c.sortedSet[j]
+ })
+ // Storing member at this map is useful to find backup members of a partition.
+ c.members[member.String()] = &member
+}
+
+// Add adds a new member to the consistent hash circle.
+func (c *Consistent) Add(member Member) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ if _, ok := c.members[member.String()]; ok {
+ // We already have this member. Quit immediately.
+ return
+ }
+ c.add(member)
+ c.distributePartitions()
+}
+
+func (c *Consistent) delSlice(val uint64) {
+ for i := 0; i < len(c.sortedSet); i++ {
+ if c.sortedSet[i] == val {
+ c.sortedSet = append(c.sortedSet[:i], c.sortedSet[i+1:]...)
+ break
+ }
+ }
+}
+
+// Remove removes a member from the consistent hash circle.
+func (c *Consistent) Remove(name string) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ if _, ok := c.members[name]; !ok {
+ // There is no member with that name. Quit immediately.
+ return
+ }
+
+ for i := 0; i < c.config.ReplicationFactor; i++ {
+ key := []byte(fmt.Sprintf("%s%d", name, i))
+ h := c.hasher.Sum64(key)
+ delete(c.ring, h)
+ c.delSlice(h)
+ }
+ delete(c.members, name)
+ if len(c.members) == 0 {
+ // consistent hash ring is empty now. Reset the partition table.
+ c.partitions = make(map[int]*Member)
+ return
+ }
+ c.distributePartitions()
+}
+
+// LoadDistribution exposes load distribution of members.
+func (c *Consistent) LoadDistribution() map[string]float64 {
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+
+ // Create a thread-safe copy
+ res := make(map[string]float64)
+ for member, load := range c.loads {
+ res[member] = load
+ }
+ return res
+}
+
+// FindPartitionID returns partition id for given key.
+func (c *Consistent) FindPartitionID(key []byte) int {
+ hkey := c.hasher.Sum64(key)
+ return int(hkey % c.partitionCount)
+}
+
+// GetPartitionOwner returns the owner of the given partition.
+func (c *Consistent) GetPartitionOwner(partID int) Member {
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+
+ member, ok := c.partitions[partID]
+ if !ok {
+ return nil
+ }
+ // Create a thread-safe copy of member and return it.
+ return *member
+}
+
+// LocateKey finds a home for given key
+func (c *Consistent) LocateKey(key []byte) Member {
+ partID := c.FindPartitionID(key)
+ return c.GetPartitionOwner(partID)
+}
+
+func (c *Consistent) getClosestN(partID, count int) ([]Member, error) {
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+
+ res := []Member{}
+ if count > len(c.members) {
+ return res, ErrInsufficientMemberCount
+ }
+
+ var ownerKey uint64
+ owner := c.GetPartitionOwner(partID)
+ // Hash and sort all the names.
+ keys := []uint64{}
+ kmems := make(map[uint64]*Member)
+ for name, member := range c.members {
+ key := c.hasher.Sum64([]byte(name))
+ if name == owner.String() {
+ ownerKey = key
+ }
+ keys = append(keys, key)
+ kmems[key] = member
+ }
+ sort.Slice(keys, func(i, j int) bool {
+ return keys[i] < keys[j]
+ })
+
+ // Find the key owner
+ idx := 0
+ for idx < len(keys) {
+ if keys[idx] == ownerKey {
+ key := keys[idx]
+ res = append(res, *kmems[key])
+ break
+ }
+ idx++
+ }
+
+ // Find the closest(replica owners) members.
+ for len(res) < count {
+ idx++
+ if idx >= len(keys) {
+ idx = 0
+ }
+ key := keys[idx]
+ res = append(res, *kmems[key])
+ }
+ return res, nil
+}
+
+// GetClosestN returns the closest N member to a key in the hash ring.
+// This may be useful to find members for replication.
+func (c *Consistent) GetClosestN(key []byte, count int) ([]Member, error) {
+ partID := c.FindPartitionID(key)
+ return c.getClosestN(partID, count)
+}
+
+// GetClosestNForPartition returns the closest N member for given partition.
+// This may be useful to find members for replication.
+func (c *Consistent) GetClosestNForPartition(partID, count int) ([]Member, error) {
+ return c.getClosestN(partID, count)
+}
diff --git a/vendor/github.com/cespare/xxhash/LICENSE.txt b/vendor/github.com/cespare/xxhash/LICENSE.txt
new file mode 100644
index 0000000..24b5306
--- /dev/null
+++ b/vendor/github.com/cespare/xxhash/LICENSE.txt
@@ -0,0 +1,22 @@
+Copyright (c) 2016 Caleb Spare
+
+MIT License
+
+Permission is hereby granted, free of charge, to any person obtaining
+a copy of this software and associated documentation files (the
+"Software"), to deal in the Software without restriction, including
+without limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
diff --git a/vendor/github.com/cespare/xxhash/README.md b/vendor/github.com/cespare/xxhash/README.md
new file mode 100644
index 0000000..0982fd2
--- /dev/null
+++ b/vendor/github.com/cespare/xxhash/README.md
@@ -0,0 +1,50 @@
+# xxhash
+
+[![GoDoc](https://godoc.org/github.com/cespare/xxhash?status.svg)](https://godoc.org/github.com/cespare/xxhash)
+
+xxhash is a Go implementation of the 64-bit
+[xxHash](http://cyan4973.github.io/xxHash/) algorithm, XXH64. This is a
+high-quality hashing algorithm that is much faster than anything in the Go
+standard library.
+
+The API is very small, taking its cue from the other hashing packages in the
+standard library:
+
+ $ go doc github.com/cespare/xxhash !
+ package xxhash // import "github.com/cespare/xxhash"
+
+ Package xxhash implements the 64-bit variant of xxHash (XXH64) as described
+ at http://cyan4973.github.io/xxHash/.
+
+ func New() hash.Hash64
+ func Sum64(b []byte) uint64
+ func Sum64String(s string) uint64
+
+This implementation provides a fast pure-Go implementation and an even faster
+assembly implementation for amd64.
+
+## Benchmarks
+
+Here are some quick benchmarks comparing the pure-Go and assembly
+implementations of Sum64 against another popular Go XXH64 implementation,
+[github.com/OneOfOne/xxhash](https://github.com/OneOfOne/xxhash):
+
+| input size | OneOfOne | cespare (purego) | cespare |
+| --- | --- | --- | --- |
+| 5 B | 416 MB/s | 720 MB/s | 872 MB/s |
+| 100 B | 3980 MB/s | 5013 MB/s | 5252 MB/s |
+| 4 KB | 12727 MB/s | 12999 MB/s | 13026 MB/s |
+| 10 MB | 9879 MB/s | 10775 MB/s | 10913 MB/s |
+
+These numbers were generated with:
+
+```
+$ go test -benchtime 10s -bench '/OneOfOne,'
+$ go test -tags purego -benchtime 10s -bench '/xxhash,'
+$ go test -benchtime 10s -bench '/xxhash,'
+```
+
+## Projects using this package
+
+- [InfluxDB](https://github.com/influxdata/influxdb)
+- [Prometheus](https://github.com/prometheus/prometheus)
diff --git a/vendor/github.com/cespare/xxhash/go.mod b/vendor/github.com/cespare/xxhash/go.mod
new file mode 100644
index 0000000..10605a6
--- /dev/null
+++ b/vendor/github.com/cespare/xxhash/go.mod
@@ -0,0 +1,6 @@
+module github.com/cespare/xxhash
+
+require (
+ github.com/OneOfOne/xxhash v1.2.2
+ github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72
+)
diff --git a/vendor/github.com/cespare/xxhash/go.sum b/vendor/github.com/cespare/xxhash/go.sum
new file mode 100644
index 0000000..f6b5542
--- /dev/null
+++ b/vendor/github.com/cespare/xxhash/go.sum
@@ -0,0 +1,4 @@
+github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
+github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
+github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
+github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
diff --git a/vendor/github.com/cespare/xxhash/rotate.go b/vendor/github.com/cespare/xxhash/rotate.go
new file mode 100644
index 0000000..f3eac5e
--- /dev/null
+++ b/vendor/github.com/cespare/xxhash/rotate.go
@@ -0,0 +1,14 @@
+// +build !go1.9
+
+package xxhash
+
+// TODO(caleb): After Go 1.10 comes out, remove this fallback code.
+
+func rol1(x uint64) uint64 { return (x << 1) | (x >> (64 - 1)) }
+func rol7(x uint64) uint64 { return (x << 7) | (x >> (64 - 7)) }
+func rol11(x uint64) uint64 { return (x << 11) | (x >> (64 - 11)) }
+func rol12(x uint64) uint64 { return (x << 12) | (x >> (64 - 12)) }
+func rol18(x uint64) uint64 { return (x << 18) | (x >> (64 - 18)) }
+func rol23(x uint64) uint64 { return (x << 23) | (x >> (64 - 23)) }
+func rol27(x uint64) uint64 { return (x << 27) | (x >> (64 - 27)) }
+func rol31(x uint64) uint64 { return (x << 31) | (x >> (64 - 31)) }
diff --git a/vendor/github.com/cespare/xxhash/rotate19.go b/vendor/github.com/cespare/xxhash/rotate19.go
new file mode 100644
index 0000000..b99612b
--- /dev/null
+++ b/vendor/github.com/cespare/xxhash/rotate19.go
@@ -0,0 +1,14 @@
+// +build go1.9
+
+package xxhash
+
+import "math/bits"
+
+func rol1(x uint64) uint64 { return bits.RotateLeft64(x, 1) }
+func rol7(x uint64) uint64 { return bits.RotateLeft64(x, 7) }
+func rol11(x uint64) uint64 { return bits.RotateLeft64(x, 11) }
+func rol12(x uint64) uint64 { return bits.RotateLeft64(x, 12) }
+func rol18(x uint64) uint64 { return bits.RotateLeft64(x, 18) }
+func rol23(x uint64) uint64 { return bits.RotateLeft64(x, 23) }
+func rol27(x uint64) uint64 { return bits.RotateLeft64(x, 27) }
+func rol31(x uint64) uint64 { return bits.RotateLeft64(x, 31) }
diff --git a/vendor/github.com/cespare/xxhash/xxhash.go b/vendor/github.com/cespare/xxhash/xxhash.go
new file mode 100644
index 0000000..f896bd2
--- /dev/null
+++ b/vendor/github.com/cespare/xxhash/xxhash.go
@@ -0,0 +1,168 @@
+// Package xxhash implements the 64-bit variant of xxHash (XXH64) as described
+// at http://cyan4973.github.io/xxHash/.
+package xxhash
+
+import (
+ "encoding/binary"
+ "hash"
+)
+
+const (
+ prime1 uint64 = 11400714785074694791
+ prime2 uint64 = 14029467366897019727
+ prime3 uint64 = 1609587929392839161
+ prime4 uint64 = 9650029242287828579
+ prime5 uint64 = 2870177450012600261
+)
+
+// NOTE(caleb): I'm using both consts and vars of the primes. Using consts where
+// possible in the Go code is worth a small (but measurable) performance boost
+// by avoiding some MOVQs. Vars are needed for the asm and also are useful for
+// convenience in the Go code in a few places where we need to intentionally
+// avoid constant arithmetic (e.g., v1 := prime1 + prime2 fails because the
+// result overflows a uint64).
+var (
+ prime1v = prime1
+ prime2v = prime2
+ prime3v = prime3
+ prime4v = prime4
+ prime5v = prime5
+)
+
+type xxh struct {
+ v1 uint64
+ v2 uint64
+ v3 uint64
+ v4 uint64
+ total int
+ mem [32]byte
+ n int // how much of mem is used
+}
+
+// New creates a new hash.Hash64 that implements the 64-bit xxHash algorithm.
+func New() hash.Hash64 {
+ var x xxh
+ x.Reset()
+ return &x
+}
+
+func (x *xxh) Reset() {
+ x.n = 0
+ x.total = 0
+ x.v1 = prime1v + prime2
+ x.v2 = prime2
+ x.v3 = 0
+ x.v4 = -prime1v
+}
+
+func (x *xxh) Size() int { return 8 }
+func (x *xxh) BlockSize() int { return 32 }
+
+// Write adds more data to x. It always returns len(b), nil.
+func (x *xxh) Write(b []byte) (n int, err error) {
+ n = len(b)
+ x.total += len(b)
+
+ if x.n+len(b) < 32 {
+ // This new data doesn't even fill the current block.
+ copy(x.mem[x.n:], b)
+ x.n += len(b)
+ return
+ }
+
+ if x.n > 0 {
+ // Finish off the partial block.
+ copy(x.mem[x.n:], b)
+ x.v1 = round(x.v1, u64(x.mem[0:8]))
+ x.v2 = round(x.v2, u64(x.mem[8:16]))
+ x.v3 = round(x.v3, u64(x.mem[16:24]))
+ x.v4 = round(x.v4, u64(x.mem[24:32]))
+ b = b[32-x.n:]
+ x.n = 0
+ }
+
+ if len(b) >= 32 {
+ // One or more full blocks left.
+ b = writeBlocks(x, b)
+ }
+
+ // Store any remaining partial block.
+ copy(x.mem[:], b)
+ x.n = len(b)
+
+ return
+}
+
+func (x *xxh) Sum(b []byte) []byte {
+ s := x.Sum64()
+ return append(
+ b,
+ byte(s>>56),
+ byte(s>>48),
+ byte(s>>40),
+ byte(s>>32),
+ byte(s>>24),
+ byte(s>>16),
+ byte(s>>8),
+ byte(s),
+ )
+}
+
+func (x *xxh) Sum64() uint64 {
+ var h uint64
+
+ if x.total >= 32 {
+ v1, v2, v3, v4 := x.v1, x.v2, x.v3, x.v4
+ h = rol1(v1) + rol7(v2) + rol12(v3) + rol18(v4)
+ h = mergeRound(h, v1)
+ h = mergeRound(h, v2)
+ h = mergeRound(h, v3)
+ h = mergeRound(h, v4)
+ } else {
+ h = x.v3 + prime5
+ }
+
+ h += uint64(x.total)
+
+ i, end := 0, x.n
+ for ; i+8 <= end; i += 8 {
+ k1 := round(0, u64(x.mem[i:i+8]))
+ h ^= k1
+ h = rol27(h)*prime1 + prime4
+ }
+ if i+4 <= end {
+ h ^= uint64(u32(x.mem[i:i+4])) * prime1
+ h = rol23(h)*prime2 + prime3
+ i += 4
+ }
+ for i < end {
+ h ^= uint64(x.mem[i]) * prime5
+ h = rol11(h) * prime1
+ i++
+ }
+
+ h ^= h >> 33
+ h *= prime2
+ h ^= h >> 29
+ h *= prime3
+ h ^= h >> 32
+
+ return h
+}
+
+func u64(b []byte) uint64 { return binary.LittleEndian.Uint64(b) }
+func u32(b []byte) uint32 { return binary.LittleEndian.Uint32(b) }
+
+func round(acc, input uint64) uint64 {
+ acc += input * prime2
+ acc = rol31(acc)
+ acc *= prime1
+ return acc
+}
+
+func mergeRound(acc, val uint64) uint64 {
+ val = round(0, val)
+ acc ^= val
+ acc = acc*prime1 + prime4
+ return acc
+}
diff --git a/vendor/github.com/cespare/xxhash/xxhash_amd64.go b/vendor/github.com/cespare/xxhash/xxhash_amd64.go
new file mode 100644
index 0000000..d617652
--- /dev/null
+++ b/vendor/github.com/cespare/xxhash/xxhash_amd64.go
@@ -0,0 +1,12 @@
+// +build !appengine
+// +build gc
+// +build !purego
+
+package xxhash
+
+// Sum64 computes the 64-bit xxHash digest of b.
+//
+//go:noescape
+func Sum64(b []byte) uint64
+
+func writeBlocks(x *xxh, b []byte) []byte
diff --git a/vendor/github.com/cespare/xxhash/xxhash_amd64.s b/vendor/github.com/cespare/xxhash/xxhash_amd64.s
new file mode 100644
index 0000000..757f201
--- /dev/null
+++ b/vendor/github.com/cespare/xxhash/xxhash_amd64.s
@@ -0,0 +1,233 @@
+// +build !appengine
+// +build gc
+// +build !purego
+
+#include "textflag.h"
+
+// Register allocation:
+// AX h
+// CX pointer to advance through b
+// DX n
+// BX loop end
+// R8 v1, k1
+// R9 v2
+// R10 v3
+// R11 v4
+// R12 tmp
+// R13 prime1v
+// R14 prime2v
+// R15 prime4v
+
+// round reads from and advances the buffer pointer in CX.
+// It assumes that R13 has prime1v and R14 has prime2v.
+#define round(r) \
+ MOVQ (CX), R12 \
+ ADDQ $8, CX \
+ IMULQ R14, R12 \
+ ADDQ R12, r \
+ ROLQ $31, r \
+ IMULQ R13, r
+
+// mergeRound applies a merge round on the two registers acc and val.
+// It assumes that R13 has prime1v, R14 has prime2v, and R15 has prime4v.
+#define mergeRound(acc, val) \
+ IMULQ R14, val \
+ ROLQ $31, val \
+ IMULQ R13, val \
+ XORQ val, acc \
+ IMULQ R13, acc \
+ ADDQ R15, acc
+
+// func Sum64(b []byte) uint64
+TEXT ·Sum64(SB), NOSPLIT, $0-32
+ // Load fixed primes.
+ MOVQ ·prime1v(SB), R13
+ MOVQ ·prime2v(SB), R14
+ MOVQ ·prime4v(SB), R15
+
+ // Load slice.
+ MOVQ b_base+0(FP), CX
+ MOVQ b_len+8(FP), DX
+ LEAQ (CX)(DX*1), BX
+
+ // The first loop limit will be len(b)-32.
+ SUBQ $32, BX
+
+ // Check whether we have at least one block.
+ CMPQ DX, $32
+ JLT noBlocks
+
+ // Set up initial state (v1, v2, v3, v4).
+ MOVQ R13, R8
+ ADDQ R14, R8
+ MOVQ R14, R9
+ XORQ R10, R10
+ XORQ R11, R11
+ SUBQ R13, R11
+
+ // Loop until CX > BX.
+blockLoop:
+ round(R8)
+ round(R9)
+ round(R10)
+ round(R11)
+
+ CMPQ CX, BX
+ JLE blockLoop
+
+ MOVQ R8, AX
+ ROLQ $1, AX
+ MOVQ R9, R12
+ ROLQ $7, R12
+ ADDQ R12, AX
+ MOVQ R10, R12
+ ROLQ $12, R12
+ ADDQ R12, AX
+ MOVQ R11, R12
+ ROLQ $18, R12
+ ADDQ R12, AX
+
+ mergeRound(AX, R8)
+ mergeRound(AX, R9)
+ mergeRound(AX, R10)
+ mergeRound(AX, R11)
+
+ JMP afterBlocks
+
+noBlocks:
+ MOVQ ·prime5v(SB), AX
+
+afterBlocks:
+ ADDQ DX, AX
+
+ // Right now BX has len(b)-32, and we want to loop until CX > len(b)-8.
+ ADDQ $24, BX
+
+ CMPQ CX, BX
+ JG fourByte
+
+wordLoop:
+ // Calculate k1.
+ MOVQ (CX), R8
+ ADDQ $8, CX
+ IMULQ R14, R8
+ ROLQ $31, R8
+ IMULQ R13, R8
+
+ XORQ R8, AX
+ ROLQ $27, AX
+ IMULQ R13, AX
+ ADDQ R15, AX
+
+ CMPQ CX, BX
+ JLE wordLoop
+
+fourByte:
+ ADDQ $4, BX
+ CMPQ CX, BX
+ JG singles
+
+ MOVL (CX), R8
+ ADDQ $4, CX
+ IMULQ R13, R8
+ XORQ R8, AX
+
+ ROLQ $23, AX
+ IMULQ R14, AX
+ ADDQ ·prime3v(SB), AX
+
+singles:
+ ADDQ $4, BX
+ CMPQ CX, BX
+ JGE finalize
+
+singlesLoop:
+ MOVBQZX (CX), R12
+ ADDQ $1, CX
+ IMULQ ·prime5v(SB), R12
+ XORQ R12, AX
+
+ ROLQ $11, AX
+ IMULQ R13, AX
+
+ CMPQ CX, BX
+ JL singlesLoop
+
+finalize:
+ MOVQ AX, R12
+ SHRQ $33, R12
+ XORQ R12, AX
+ IMULQ R14, AX
+ MOVQ AX, R12
+ SHRQ $29, R12
+ XORQ R12, AX
+ IMULQ ·prime3v(SB), AX
+ MOVQ AX, R12
+ SHRQ $32, R12
+ XORQ R12, AX
+
+ MOVQ AX, ret+24(FP)
+ RET
+
+// writeBlocks uses the same registers as above except that it uses AX to store
+// the x pointer.
+
+// func writeBlocks(x *xxh, b []byte) []byte
+TEXT ·writeBlocks(SB), NOSPLIT, $0-56
+ // Load fixed primes needed for round.
+ MOVQ ·prime1v(SB), R13
+ MOVQ ·prime2v(SB), R14
+
+ // Load slice.
+ MOVQ b_base+8(FP), CX
+ MOVQ CX, ret_base+32(FP) // initialize return base pointer; see NOTE below
+ MOVQ b_len+16(FP), DX
+ LEAQ (CX)(DX*1), BX
+ SUBQ $32, BX
+
+ // Load vN from x.
+ MOVQ x+0(FP), AX
+ MOVQ 0(AX), R8 // v1
+ MOVQ 8(AX), R9 // v2
+ MOVQ 16(AX), R10 // v3
+ MOVQ 24(AX), R11 // v4
+
+ // We don't need to check the loop condition here; this function is
+ // always called with at least one block of data to process.
+blockLoop:
+ round(R8)
+ round(R9)
+ round(R10)
+ round(R11)
+
+ CMPQ CX, BX
+ JLE blockLoop
+
+ // Copy vN back to x.
+ MOVQ R8, 0(AX)
+ MOVQ R9, 8(AX)
+ MOVQ R10, 16(AX)
+ MOVQ R11, 24(AX)
+
+ // Construct return slice.
+ // NOTE: It's important that we don't construct a slice that has a base
+ // pointer off the end of the original slice, as in Go 1.7+ this will
+ // cause runtime crashes. (See discussion in, for example,
+ // https://github.com/golang/go/issues/16772.)
+ // Therefore, we calculate the length/cap first, and if they're zero, we
+ // keep the old base. This is what the compiler does as well if you
+ // write code like
+ // b = b[len(b):]
+
+ // New length is 32 - (CX - BX) -> BX+32 - CX.
+ ADDQ $32, BX
+ SUBQ CX, BX
+ JZ afterSetBase
+
+ MOVQ CX, ret_base+32(FP)
+
+afterSetBase:
+ MOVQ BX, ret_len+40(FP)
+ MOVQ BX, ret_cap+48(FP) // set cap == len
+
+ RET
diff --git a/vendor/github.com/cespare/xxhash/xxhash_other.go b/vendor/github.com/cespare/xxhash/xxhash_other.go
new file mode 100644
index 0000000..c68d13f
--- /dev/null
+++ b/vendor/github.com/cespare/xxhash/xxhash_other.go
@@ -0,0 +1,75 @@
+// +build !amd64 appengine !gc purego
+
+package xxhash
+
+// Sum64 computes the 64-bit xxHash digest of b.
+func Sum64(b []byte) uint64 {
+ // A simpler version would be
+ // x := New()
+ // x.Write(b)
+ // return x.Sum64()
+ // but this is faster, particularly for small inputs.
+
+ n := len(b)
+ var h uint64
+
+ if n >= 32 {
+ v1 := prime1v + prime2
+ v2 := prime2
+ v3 := uint64(0)
+ v4 := -prime1v
+ for len(b) >= 32 {
+ v1 = round(v1, u64(b[0:8:len(b)]))
+ v2 = round(v2, u64(b[8:16:len(b)]))
+ v3 = round(v3, u64(b[16:24:len(b)]))
+ v4 = round(v4, u64(b[24:32:len(b)]))
+ b = b[32:len(b):len(b)]
+ }
+ h = rol1(v1) + rol7(v2) + rol12(v3) + rol18(v4)
+ h = mergeRound(h, v1)
+ h = mergeRound(h, v2)
+ h = mergeRound(h, v3)
+ h = mergeRound(h, v4)
+ } else {
+ h = prime5
+ }
+
+ h += uint64(n)
+
+ i, end := 0, len(b)
+ for ; i+8 <= end; i += 8 {
+ k1 := round(0, u64(b[i:i+8:len(b)]))
+ h ^= k1
+ h = rol27(h)*prime1 + prime4
+ }
+ if i+4 <= end {
+ h ^= uint64(u32(b[i:i+4:len(b)])) * prime1
+ h = rol23(h)*prime2 + prime3
+ i += 4
+ }
+ for ; i < end; i++ {
+ h ^= uint64(b[i]) * prime5
+ h = rol11(h) * prime1
+ }
+
+ h ^= h >> 33
+ h *= prime2
+ h ^= h >> 29
+ h *= prime3
+ h ^= h >> 32
+
+ return h
+}
+
+func writeBlocks(x *xxh, b []byte) []byte {
+ v1, v2, v3, v4 := x.v1, x.v2, x.v3, x.v4
+ for len(b) >= 32 {
+ v1 = round(v1, u64(b[0:8:len(b)]))
+ v2 = round(v2, u64(b[8:16:len(b)]))
+ v3 = round(v3, u64(b[16:24:len(b)]))
+ v4 = round(v4, u64(b[24:32:len(b)]))
+ b = b[32:len(b):len(b)]
+ }
+ x.v1, x.v2, x.v3, x.v4 = v1, v2, v3, v4
+ return b
+}
diff --git a/vendor/github.com/cespare/xxhash/xxhash_safe.go b/vendor/github.com/cespare/xxhash/xxhash_safe.go
new file mode 100644
index 0000000..dfa15ab
--- /dev/null
+++ b/vendor/github.com/cespare/xxhash/xxhash_safe.go
@@ -0,0 +1,10 @@
+// +build appengine
+
+// This file contains the safe implementations of otherwise unsafe-using code.
+
+package xxhash
+
+// Sum64String computes the 64-bit xxHash digest of s.
+func Sum64String(s string) uint64 {
+ return Sum64([]byte(s))
+}
diff --git a/vendor/github.com/cespare/xxhash/xxhash_unsafe.go b/vendor/github.com/cespare/xxhash/xxhash_unsafe.go
new file mode 100644
index 0000000..d2b64e8
--- /dev/null
+++ b/vendor/github.com/cespare/xxhash/xxhash_unsafe.go
@@ -0,0 +1,30 @@
+// +build !appengine
+
+// This file encapsulates usage of unsafe.
+// xxhash_safe.go contains the safe implementations.
+
+package xxhash
+
+import (
+ "reflect"
+ "unsafe"
+)
+
+// Sum64String computes the 64-bit xxHash digest of s.
+// It may be faster than Sum64([]byte(s)) by avoiding a copy.
+//
+// TODO(caleb): Consider removing this if an optimization is ever added to make
+// it unnecessary: https://golang.org/issue/2205.
+//
+// TODO(caleb): We still have a function call; we could instead write Go/asm
+// copies of Sum64 for strings to squeeze out a bit more speed.
+func Sum64String(s string) uint64 {
+ // See https://groups.google.com/d/msg/golang-nuts/dcjzJy-bSpw/tcZYBzQqAQAJ
+ // for some discussion about this unsafe conversion.
+ var b []byte
+ bh := (*reflect.SliceHeader)(unsafe.Pointer(&b))
+ bh.Data = (*reflect.StringHeader)(unsafe.Pointer(&s)).Data
+ bh.Len = len(s)
+ bh.Cap = len(s)
+ return Sum64(b)
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/adapter_proxy.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/adapter_proxy.go
index 02fa3de..bbae0ed 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/adapter_proxy.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/adapter_proxy.go
@@ -17,6 +17,7 @@
import (
"context"
+ "github.com/opencord/voltha-lib-go/v3/pkg/db"
"time"
"github.com/golang/protobuf/proto"
@@ -32,14 +33,17 @@
kafkaICProxy kafka.InterContainerProxy
adapterTopic string
coreTopic string
+ endpointMgr kafka.EndpointManager
}
-func NewAdapterProxy(kafkaProxy kafka.InterContainerProxy, adapterTopic string, coreTopic string) *AdapterProxy {
- var proxy AdapterProxy
- proxy.kafkaICProxy = kafkaProxy
- proxy.adapterTopic = adapterTopic
- proxy.coreTopic = coreTopic
- logger.Debugw("TOPICS", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
+func NewAdapterProxy(kafkaProxy kafka.InterContainerProxy, adapterTopic string, coreTopic string, backend *db.Backend) *AdapterProxy {
+ proxy := AdapterProxy{
+ kafkaICProxy: kafkaProxy,
+ adapterTopic: adapterTopic,
+ coreTopic: coreTopic,
+ endpointMgr: kafka.NewEndpointManager(backend),
+ }
+ logger.Debugw("topics", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
return &proxy
}
@@ -87,7 +91,11 @@
}
// Set up the required rpc arguments
- topic := kafka.Topic{Name: toAdapter}
+ endpoint, err := ap.endpointMgr.GetEndpoint(toDeviceId, toAdapter)
+ if err != nil {
+ return err
+ }
+ topic := kafka.Topic{Name: string(endpoint)}
replyToTopic := kafka.Topic{Name: fromAdapter}
rpc := "process_inter_adapter_message"
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/core_proxy.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/core_proxy.go
index 9582f33..20e1a52 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/core_proxy.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/core_proxy.go
@@ -99,6 +99,28 @@
topic := kafka.Topic{Name: ap.coreTopic}
replyToTopic := ap.getAdapterTopic()
args := make([]*kafka.KVArg, 2)
+
+ if adapter.TotalReplicas == 0 && adapter.CurrentReplica != 0 {
+ log.Fatal("totalReplicas can't be 0, since you're here you have at least one")
+ }
+
+ if adapter.CurrentReplica == 0 && adapter.TotalReplicas != 0 {
+ log.Fatal("currentReplica can't be 0, it has to start from 1")
+ }
+
+ if adapter.CurrentReplica == 0 && adapter.TotalReplicas == 0 {
+ // if the adapter is not setting these fields they default to 0,
+ // in that case it means the adapter is not ready to be scaled and thus it defaults
+ // to a single instance
+ adapter.CurrentReplica = 1
+ adapter.TotalReplicas = 1
+ }
+
+ if adapter.CurrentReplica > adapter.TotalReplicas {
+ log.Fatalf("CurrentReplica (%d) can't be greater than TotalReplicas (%d)",
+ adapter.CurrentReplica, adapter.TotalReplicas)
+ }
+
args[0] = &kafka.KVArg{
Key: "adapter",
Value: adapter,
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/configmanager.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/configmanager.go
index 9f08b0d..724ad32 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/configmanager.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/configmanager.go
@@ -63,10 +63,10 @@
ConfigAttribute string
}
-// ConfigManager is a wrapper over backend to maintain Configuration of voltha components
+// ConfigManager is a wrapper over Backend to maintain Configuration of voltha components
// in kvstore based persistent storage
type ConfigManager struct {
- backend *db.Backend
+ Backend *db.Backend
KvStoreConfigPrefix string
}
@@ -95,7 +95,7 @@
return &ConfigManager{
KvStoreConfigPrefix: defaultkvStoreConfigPath,
- backend: &db.Backend{
+ Backend: &db.Backend{
Client: kvClient,
StoreType: kvStoreType,
Host: kvStoreHost,
@@ -108,12 +108,12 @@
// RetrieveComponentList list the component Names for which loglevel is stored in kvstore
func (c *ConfigManager) RetrieveComponentList(ctx context.Context, configType ConfigType) ([]string, error) {
- data, err := c.backend.List(ctx, c.KvStoreConfigPrefix)
+ data, err := c.Backend.List(ctx, c.KvStoreConfigPrefix)
if err != nil {
return nil, err
}
- // Looping through the data recieved from the backend for config
+ // Looping through the data recieved from the Backend for config
// Trimming and Splitting the required key and value from data and storing as componentName,PackageName and Level
// For Example, recieved key would be <Backend Prefix Path>/<Config Prefix>/<Component Name>/<Config Type>/default and value \"DEBUG\"
// Then in default will be stored as PackageName,componentName as <Component Name> and DEBUG will be stored as value in List struct
@@ -168,14 +168,14 @@
c.changeEventChan = make(chan *ConfigChangeEvent, 1)
- c.kvStoreEventChan = c.cManager.backend.CreateWatch(ctx, key, true)
+ c.kvStoreEventChan = c.cManager.Backend.CreateWatch(ctx, key, true)
go c.processKVStoreWatchEvents()
return c.changeEventChan
}
-// processKVStoreWatchEvents process event channel recieved from the backend for any ChangeType
+// processKVStoreWatchEvents process event channel recieved from the Backend for any ChangeType
// It checks for the EventType is valid or not.For the valid EventTypes creates ConfigChangeEvent and send it on channel
func (c *ComponentConfig) processKVStoreWatchEvents() {
@@ -183,7 +183,7 @@
logger.Debugw("processing-kvstore-event-change", log.Fields{"key-prefix": ccKeyPrefix})
- ccPathPrefix := c.cManager.backend.PathPrefix + ccKeyPrefix + kvStorePathSeparator
+ ccPathPrefix := c.cManager.Backend.PathPrefix + ccKeyPrefix + kvStorePathSeparator
for watchResp := range c.kvStoreEventChan {
@@ -210,7 +210,7 @@
logger.Debugw("retrieving-config", log.Fields{"key": key})
- if kvpair, err := c.cManager.backend.Get(ctx, key); err != nil {
+ if kvpair, err := c.cManager.Backend.Get(ctx, key); err != nil {
return "", err
} else {
if kvpair == nil {
@@ -228,17 +228,17 @@
logger.Debugw("retreiving-list", log.Fields{"key": key})
- data, err := c.cManager.backend.List(ctx, key)
+ data, err := c.cManager.Backend.List(ctx, key)
if err != nil {
return nil, err
}
- // Looping through the data recieved from the backend for the given key
+ // Looping through the data recieved from the Backend for the given key
// Trimming the required key and value from data and storing as key/value pair
// For Example, recieved key would be <Backend Prefix Path>/<Config Prefix>/<Component Name>/<Config Type>/default and value \"DEBUG\"
// Then in default will be stored as key and DEBUG will be stored as value in map[string]string
res := make(map[string]string)
- ccPathPrefix := c.cManager.backend.PathPrefix + kvStorePathSeparator + key + kvStorePathSeparator
+ ccPathPrefix := c.cManager.Backend.PathPrefix + kvStorePathSeparator + key + kvStorePathSeparator
for attr, val := range data {
res[strings.TrimPrefix(attr, ccPathPrefix)] = strings.Trim(fmt.Sprintf("%s", val.Value), "\"")
}
@@ -252,7 +252,7 @@
logger.Debugw("saving-config", log.Fields{"key": key, "value": configValue})
//save the data for update config
- if err := c.cManager.backend.Put(ctx, key, configValue); err != nil {
+ if err := c.cManager.Backend.Put(ctx, key, configValue); err != nil {
return err
}
return nil
@@ -264,7 +264,7 @@
logger.Debugw("deleting-config", log.Fields{"key": key})
//delete the config
- if err := c.cManager.backend.Delete(ctx, key); err != nil {
+ if err := c.cManager.Backend.Delete(ctx, key); err != nil {
return err
}
return nil
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/logcontroller.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/logcontroller.go
index b929c9d..9c36241 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/logcontroller.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/logcontroller.go
@@ -15,7 +15,7 @@
*/
// Package Config provides dynamic logging configuration for specific Voltha component with loglevel lookup
-// from etcd kvstore implemented using backend.
+// from etcd kvstore implemented using Backend.
// Any Voltha component can start utilizing dynamic logging by starting goroutine of StartLogLevelConfigProcessing after
// starting kvClient for the component.
@@ -121,8 +121,8 @@
}
// ProcessLogConfig will first load and apply log config and then start waiting on component config and global config
-// channels for any changes. Event channel will be recieved from backend for valid change type
-// Then data for componentn log config and global log config will be retrieved from backend and stored in updatedLogConfig in precedence order
+// channels for any changes. Event channel will be recieved from Backend for valid change type
+// Then data for componentn log config and global log config will be retrieved from Backend and stored in updatedLogConfig in precedence order
// If any changes in updatedLogConfig will be applied on component
func (c *ComponentLogController) processLogConfig(ctx context.Context) {
@@ -247,7 +247,7 @@
return componentLogConfig, nil
}
-// buildUpdatedLogConfig retrieve the global logConfig and component logConfig from backend
+// buildUpdatedLogConfig retrieve the global logConfig and component logConfig from Backend
// component logConfig stores the log config with precedence order
// For example, If the global logConfig is set and component logConfig is set only for specific package then
// component logConfig is stored with global logConfig and component logConfig of specific package
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/endpoint_manager.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/endpoint_manager.go
new file mode 100644
index 0000000..1258382
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/endpoint_manager.go
@@ -0,0 +1,352 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+ "context"
+ "fmt"
+ "github.com/buraksezer/consistent"
+ "github.com/cespare/xxhash"
+ "github.com/golang/protobuf/proto"
+ "github.com/opencord/voltha-lib-go/v3/pkg/db"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "sync"
+)
+
+const (
+ // All the values below can be tuned to get optimal data distribution. The numbers below seems to work well when
+ // supporting 1000-10000 devices and 1 - 20 replicas of a service
+
+ // Keys are distributed among partitions. Prime numbers are good to distribute keys uniformly.
+ DefaultPartitionCount = 1117
+
+ // Represents how many times a node is replicated on the consistent ring.
+ DefaultReplicationFactor = 117
+
+ // Load is used to calculate average load.
+ DefaultLoad = 1.1
+)
+
+type Endpoint string // Endpoint of a service instance. When using kafka, this is the topic name of a service
+type ReplicaID int32 // The replication ID of a service instance
+
+type EndpointManager interface {
+
+ // GetEndpoint is called to get the endpoint to communicate with for a specific device and service type. For
+ // now this will return the topic name
+ GetEndpoint(deviceID string, serviceType string) (Endpoint, error)
+
+ // IsDeviceOwnedByService is invoked when a specific service (service type + replicaNumber) is restarted and
+ // devices owned by that service need to be reconciled
+ IsDeviceOwnedByService(deviceID string, serviceType string, replicaNumber int32) (bool, error)
+
+ // GetReplicaAssignment returns the replica number of the service that owns the deviceID. This is used by the
+ // test only
+ GetReplicaAssignment(deviceID string, serviceType string) (ReplicaID, error)
+}
+
+type service struct {
+ id string // Id of the service. The same id is used for all replicas
+ totalReplicas int32
+ replicas map[ReplicaID]Endpoint
+ consistentRing *consistent.Consistent
+}
+
+type endpointManager struct {
+ partitionCount int
+ replicationFactor int
+ load float64
+ backend *db.Backend
+ services map[string]*service
+ servicesLock sync.RWMutex
+ deviceTypeServiceMap map[string]string
+ deviceTypeServiceMapLock sync.RWMutex
+}
+
+type EndpointManagerOption func(*endpointManager)
+
+func PartitionCount(count int) EndpointManagerOption {
+ return func(args *endpointManager) {
+ args.partitionCount = count
+ }
+}
+
+func ReplicationFactor(replicas int) EndpointManagerOption {
+ return func(args *endpointManager) {
+ args.replicationFactor = replicas
+ }
+}
+
+func Load(load float64) EndpointManagerOption {
+ return func(args *endpointManager) {
+ args.load = load
+ }
+}
+
+func newEndpointManager(backend *db.Backend, opts ...EndpointManagerOption) EndpointManager {
+ tm := &endpointManager{
+ partitionCount: DefaultPartitionCount,
+ replicationFactor: DefaultReplicationFactor,
+ load: DefaultLoad,
+ backend: backend,
+ services: make(map[string]*service),
+ deviceTypeServiceMap: make(map[string]string),
+ }
+
+ for _, option := range opts {
+ option(tm)
+ }
+ return tm
+}
+
+func NewEndpointManager(backend *db.Backend, opts ...EndpointManagerOption) EndpointManager {
+ return newEndpointManager(backend, opts...)
+}
+
+func (ep *endpointManager) GetEndpoint(deviceID string, serviceType string) (Endpoint, error) {
+ logger.Debugw("getting-endpoint", log.Fields{"device-id": deviceID, "service": serviceType})
+ owner, err := ep.getOwner(deviceID, serviceType)
+ if err != nil {
+ return "", err
+ }
+ m, ok := owner.(Member)
+ if !ok {
+ return "", status.Errorf(codes.Aborted, "invalid-member-%v", owner)
+ }
+ endpoint := m.getEndPoint()
+ if endpoint == "" {
+ return "", status.Errorf(codes.Unavailable, "endpoint-not-set-%s", serviceType)
+ }
+ logger.Debugw("returning-endpoint", log.Fields{"device-id": deviceID, "service": serviceType, "endpoint": endpoint})
+ return endpoint, nil
+}
+
+func (ep *endpointManager) IsDeviceOwnedByService(deviceID string, serviceType string, replicaNumber int32) (bool, error) {
+ logger.Debugw("device-ownership", log.Fields{"device-id": deviceID, "service": serviceType, "replica-number": replicaNumber})
+ owner, err := ep.getOwner(deviceID, serviceType)
+ if err != nil {
+ return false, nil
+ }
+ m, ok := owner.(Member)
+ if !ok {
+ return false, status.Errorf(codes.Aborted, "invalid-member-%v", owner)
+ }
+ return m.getReplica() == ReplicaID(replicaNumber), nil
+}
+
+func (ep *endpointManager) GetReplicaAssignment(deviceID string, serviceType string) (ReplicaID, error) {
+ owner, err := ep.getOwner(deviceID, serviceType)
+ if err != nil {
+ return 0, nil
+ }
+ m, ok := owner.(Member)
+ if !ok {
+ return 0, status.Errorf(codes.Aborted, "invalid-member-%v", owner)
+ }
+ return m.getReplica(), nil
+}
+
+func (ep *endpointManager) getOwner(deviceID string, serviceType string) (consistent.Member, error) {
+ serv, dType, err := ep.getServiceAndDeviceType(serviceType)
+ if err != nil {
+ return nil, err
+ }
+ key := ep.makeKey(deviceID, dType, serviceType)
+ return serv.consistentRing.LocateKey(key), nil
+}
+
+func (ep *endpointManager) getServiceAndDeviceType(serviceType string) (*service, string, error) {
+ // Check whether service exist
+ ep.servicesLock.RLock()
+ serv, serviceExist := ep.services[serviceType]
+ ep.servicesLock.RUnlock()
+
+ // Load the service and device types if needed
+ if !serviceExist || serv == nil || int(serv.totalReplicas) != len(serv.consistentRing.GetMembers()) {
+ if err := ep.loadServices(); err != nil {
+ return nil, "", err
+ }
+
+ // Check whether the service exists now
+ ep.servicesLock.RLock()
+ serv, serviceExist = ep.services[serviceType]
+ ep.servicesLock.RUnlock()
+ if !serviceExist || serv == nil || int(serv.totalReplicas) != len(serv.consistentRing.GetMembers()) {
+ return nil, "", status.Errorf(codes.NotFound, "service-%s", serviceType)
+ }
+ }
+
+ ep.deviceTypeServiceMapLock.RLock()
+ defer ep.deviceTypeServiceMapLock.RUnlock()
+ for dType, sType := range ep.deviceTypeServiceMap {
+ if sType == serviceType {
+ return serv, dType, nil
+ }
+ }
+ return nil, "", status.Errorf(codes.NotFound, "service-%s", serviceType)
+}
+
+func (ep *endpointManager) getConsistentConfig() consistent.Config {
+ return consistent.Config{
+ PartitionCount: ep.partitionCount,
+ ReplicationFactor: ep.replicationFactor,
+ Load: ep.load,
+ Hasher: hasher{},
+ }
+}
+
+// loadServices loads the services (adapters) and device types in memory. Because of the small size of the data and
+// the data format in the dB being binary protobuf then it is better to load all the data if inconsistency is detected,
+// instead of watching for updates in the dB and acting on it.
+func (ep *endpointManager) loadServices() error {
+ ep.servicesLock.Lock()
+ defer ep.servicesLock.Unlock()
+ ep.deviceTypeServiceMapLock.Lock()
+ defer ep.deviceTypeServiceMapLock.Unlock()
+
+ if ep.backend == nil {
+ return status.Error(codes.Aborted, "backend-not-set")
+ }
+ ep.services = make(map[string]*service)
+ ep.deviceTypeServiceMap = make(map[string]string)
+
+ // Load the adapters
+ blobs, err := ep.backend.List(context.Background(), "adapters")
+ if err != nil {
+ return err
+ }
+
+ // Data is marshalled as proto bytes in the data store
+ for _, blob := range blobs {
+ data := blob.Value.([]byte)
+ adapter := &voltha.Adapter{}
+ if err := proto.Unmarshal(data, adapter); err != nil {
+ return err
+ }
+ // A valid adapter should have the vendorID set
+ if adapter.Vendor != "" {
+ if _, ok := ep.services[adapter.Type]; !ok {
+ ep.services[adapter.Type] = &service{
+ id: adapter.Type,
+ totalReplicas: adapter.TotalReplicas,
+ replicas: make(map[ReplicaID]Endpoint),
+ consistentRing: consistent.New(nil, ep.getConsistentConfig()),
+ }
+
+ }
+ currentReplica := ReplicaID(adapter.CurrentReplica)
+ endpoint := Endpoint(adapter.Endpoint)
+ ep.services[adapter.Type].replicas[currentReplica] = endpoint
+ ep.services[adapter.Type].consistentRing.Add(newMember(adapter.Id, adapter.Type, adapter.Vendor, endpoint, adapter.Version, currentReplica))
+ }
+ }
+ // Load the device types
+ blobs, err = ep.backend.List(context.Background(), "device_types")
+ if err != nil {
+ return err
+ }
+ for _, blob := range blobs {
+ data := blob.Value.([]byte)
+ deviceType := &voltha.DeviceType{}
+ if err := proto.Unmarshal(data, deviceType); err != nil {
+ return err
+ }
+ if _, ok := ep.deviceTypeServiceMap[deviceType.Id]; !ok {
+ ep.deviceTypeServiceMap[deviceType.Id] = deviceType.Adapter
+ }
+ }
+
+ // Log the loaded data in debug mode to facilitate trouble shooting
+ if logger.V(log.DebugLevel) {
+ for key, val := range ep.services {
+ members := val.consistentRing.GetMembers()
+ logger.Debugw("service", log.Fields{"service": key, "expected-replica": val.totalReplicas, "replicas": len(val.consistentRing.GetMembers())})
+ for _, m := range members {
+ n := m.(Member)
+ logger.Debugw("service-loaded", log.Fields{"serviceId": n.getID(), "serviceType": n.getServiceType(), "replica": n.getReplica(), "endpoint": n.getEndPoint()})
+ }
+ }
+ logger.Debugw("device-types-loaded", log.Fields{"device-types": ep.deviceTypeServiceMap})
+ }
+ return nil
+}
+
+// makeKey creates the string that the hash function uses to create the hash
+func (ep *endpointManager) makeKey(deviceID string, deviceType string, serviceType string) []byte {
+ return []byte(fmt.Sprintf("%s_%s_%s", serviceType, deviceType, deviceID))
+}
+
+// The consistent package requires a hasher function
+type hasher struct{}
+
+// Sum64 provides the hasher function. Based upon numerous testing scenarios, the xxhash package seems to provide the
+// best distribution compare to other hash packages
+func (h hasher) Sum64(data []byte) uint64 {
+ return xxhash.Sum64(data)
+}
+
+// Member represents a member on the consistent ring
+type Member interface {
+ String() string
+ getReplica() ReplicaID
+ getEndPoint() Endpoint
+ getID() string
+ getServiceType() string
+}
+
+// member implements the Member interface
+type member struct {
+ id string
+ serviceType string
+ vendor string
+ version string
+ replica ReplicaID
+ endpoint Endpoint
+}
+
+func newMember(ID string, serviceType string, vendor string, endPoint Endpoint, version string, replica ReplicaID) Member {
+ return &member{
+ id: ID,
+ serviceType: serviceType,
+ vendor: vendor,
+ version: version,
+ replica: replica,
+ endpoint: endPoint,
+ }
+}
+
+func (m *member) String() string {
+ return string(m.endpoint)
+}
+
+func (m *member) getReplica() ReplicaID {
+ return m.replica
+}
+
+func (m *member) getEndPoint() Endpoint {
+ return m.endpoint
+}
+
+func (m *member) getID() string {
+ return m.id
+}
+
+func (m *member) getServiceType() string {
+ return m.serviceType
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/common.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd/common.go
similarity index 98%
copy from vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/common.go
copy to vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd/common.go
index 90612bb..a45b4b2 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/common.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd/common.go
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package mocks
+package etcd
import (
"github.com/opencord/voltha-lib-go/v3/pkg/log"
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd_server.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd/etcd_server.go
similarity index 99%
rename from vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd_server.go
rename to vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd/etcd_server.go
index 487b991..b4e201d 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd_server.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd/etcd_server.go
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package mocks
+package etcd
import (
"fmt"
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/common.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/common.go
similarity index 98%
rename from vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/common.go
rename to vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/common.go
index 90612bb..05bc5f9 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/common.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/common.go
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package mocks
+package kafka
import (
"github.com/opencord/voltha-lib-go/v3/pkg/log"
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/endpoint_manager.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/endpoint_manager.go
new file mode 100644
index 0000000..fedbebf
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/endpoint_manager.go
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka
+
+import (
+ "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
+)
+
+type EndpointManager struct{}
+
+func NewEndpointManager() kafka.EndpointManager {
+ mock := &EndpointManager{}
+ return mock
+}
+
+func (em *EndpointManager) GetEndpoint(deviceID string, serviceType string) (kafka.Endpoint, error) {
+ // TODO add mocks call and args
+ return kafka.Endpoint(serviceType), nil
+}
+
+func (em *EndpointManager) IsDeviceOwnedByService(deviceID string, serviceType string, replicaNumber int32) (bool, error) {
+ // TODO add mocks call and args
+ return true, nil
+}
+
+func (em *EndpointManager) GetReplicaAssignment(deviceID string, serviceType string) (kafka.ReplicaID, error) {
+ return kafka.ReplicaID(1), nil
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka_client.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/kafka_client.go
similarity index 99%
rename from vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka_client.go
rename to vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/kafka_client.go
index 38f147e..5922ce2 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka_client.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/kafka_client.go
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package mocks
+package kafka
import (
"fmt"
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka_inter_container_proxy.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/kafka_inter_container_proxy.go
similarity index 99%
rename from vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka_inter_container_proxy.go
rename to vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/kafka_inter_container_proxy.go
index 9879830..34aec95 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka_inter_container_proxy.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/kafka_inter_container_proxy.go
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-package mocks
+package kafka
import (
"context"
diff --git a/vendor/github.com/opencord/voltha-protos/v3/go/voltha/adapter.pb.go b/vendor/github.com/opencord/voltha-protos/v3/go/voltha/adapter.pb.go
index 4cc76e0..9359dc1 100644
--- a/vendor/github.com/opencord/voltha-protos/v3/go/voltha/adapter.pb.go
+++ b/vendor/github.com/opencord/voltha-protos/v3/go/voltha/adapter.pb.go
@@ -65,8 +65,8 @@
// Adapter (software plugin)
type Adapter struct {
- // Unique name of adapter, matching the python package name under
- // voltha/adapters.
+ // the adapter ID has to be unique,
+ // it will be generated as Type + CurrentReplica
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
Vendor string `protobuf:"bytes,2,opt,name=vendor,proto3" json:"vendor,omitempty"`
Version string `protobuf:"bytes,3,opt,name=version,proto3" json:"version,omitempty"`
@@ -76,10 +76,16 @@
AdditionalDescription *any.Any `protobuf:"bytes,64,opt,name=additional_description,json=additionalDescription,proto3" json:"additional_description,omitempty"`
LogicalDeviceIds []string `protobuf:"bytes,4,rep,name=logical_device_ids,json=logicalDeviceIds,proto3" json:"logical_device_ids,omitempty"`
// timestamp when the adapter last sent a message to the core
- LastCommunication *timestamp.Timestamp `protobuf:"bytes,5,opt,name=last_communication,json=lastCommunication,proto3" json:"last_communication,omitempty"`
- XXX_NoUnkeyedLiteral struct{} `json:"-"`
- XXX_unrecognized []byte `json:"-"`
- XXX_sizecache int32 `json:"-"`
+ LastCommunication *timestamp.Timestamp `protobuf:"bytes,5,opt,name=last_communication,json=lastCommunication,proto3" json:"last_communication,omitempty"`
+ CurrentReplica int32 `protobuf:"varint,6,opt,name=currentReplica,proto3" json:"currentReplica,omitempty"`
+ TotalReplicas int32 `protobuf:"varint,7,opt,name=totalReplicas,proto3" json:"totalReplicas,omitempty"`
+ Endpoint string `protobuf:"bytes,8,opt,name=endpoint,proto3" json:"endpoint,omitempty"`
+ // all replicas of the same adapter will have the same type
+ // it is used to associate a device to an adapter
+ Type string `protobuf:"bytes,9,opt,name=type,proto3" json:"type,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
}
func (m *Adapter) Reset() { *m = Adapter{} }
@@ -156,6 +162,34 @@
return nil
}
+func (m *Adapter) GetCurrentReplica() int32 {
+ if m != nil {
+ return m.CurrentReplica
+ }
+ return 0
+}
+
+func (m *Adapter) GetTotalReplicas() int32 {
+ if m != nil {
+ return m.TotalReplicas
+ }
+ return 0
+}
+
+func (m *Adapter) GetEndpoint() string {
+ if m != nil {
+ return m.Endpoint
+ }
+ return ""
+}
+
+func (m *Adapter) GetType() string {
+ if m != nil {
+ return m.Type
+ }
+ return ""
+}
+
type Adapters struct {
Items []*Adapter `protobuf:"bytes,1,rep,name=items,proto3" json:"items,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
@@ -204,31 +238,34 @@
func init() { proto.RegisterFile("voltha_protos/adapter.proto", fileDescriptor_7e998ce153307274) }
var fileDescriptor_7e998ce153307274 = []byte{
- // 405 bytes of a gzipped FileDescriptorProto
- 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x92, 0xd1, 0x6a, 0xdb, 0x30,
- 0x14, 0x86, 0x71, 0xb2, 0xb8, 0xab, 0x4a, 0x59, 0xaa, 0x2d, 0xc3, 0xf3, 0x28, 0x35, 0x81, 0x81,
- 0x2f, 0x56, 0x99, 0xb5, 0x2f, 0xb0, 0xa4, 0xbd, 0xe9, 0xad, 0x28, 0xbb, 0xd8, 0x8d, 0x51, 0x24,
- 0xd5, 0x15, 0xd8, 0x3a, 0xc6, 0x52, 0x0c, 0x7d, 0xc8, 0xbd, 0xc1, 0x1e, 0x60, 0x4f, 0xb0, 0xeb,
- 0x11, 0x49, 0x26, 0x4e, 0x06, 0xbd, 0x32, 0xfa, 0xbf, 0xff, 0xfc, 0xe7, 0x1c, 0xc9, 0xe8, 0x73,
- 0x0f, 0xb5, 0x7d, 0x66, 0x65, 0xdb, 0x81, 0x05, 0x53, 0x30, 0xc1, 0x5a, 0x2b, 0x3b, 0xe2, 0x8e,
- 0x38, 0xf6, 0x30, 0xfd, 0x54, 0x01, 0x54, 0xb5, 0x2c, 0x9c, 0xba, 0xd9, 0x3e, 0x15, 0x4c, 0xbf,
- 0x78, 0x4b, 0x9a, 0x1e, 0xd6, 0x73, 0x68, 0x1a, 0xd0, 0x81, 0x25, 0x87, 0xac, 0x91, 0x96, 0x05,
- 0x72, 0x75, 0x1c, 0x68, 0x55, 0x23, 0x8d, 0x65, 0x4d, 0xeb, 0x0d, 0x4b, 0x8a, 0xce, 0x57, 0x7e,
- 0x94, 0x3b, 0xd0, 0x4f, 0xaa, 0xc2, 0x2b, 0x74, 0xc1, 0x84, 0x50, 0x56, 0x81, 0x66, 0x75, 0xc9,
- 0x9d, 0x98, 0x7c, 0xcf, 0xa2, 0xfc, 0xec, 0xe6, 0x03, 0xf1, 0x69, 0x64, 0x48, 0x23, 0x2b, 0xfd,
- 0x42, 0xe7, 0x7b, 0xbb, 0x8f, 0x58, 0xfe, 0x9e, 0xa0, 0x93, 0x10, 0x8a, 0x17, 0x68, 0xa2, 0x44,
- 0x12, 0x65, 0x51, 0x7e, 0xba, 0x9e, 0xfd, 0xf9, 0xfb, 0xeb, 0x32, 0xa2, 0x13, 0x25, 0xf0, 0x25,
- 0x8a, 0x7b, 0xa9, 0x05, 0x74, 0xc9, 0x64, 0x8c, 0x82, 0x88, 0xaf, 0xd0, 0x49, 0x2f, 0x3b, 0xa3,
- 0x40, 0x27, 0xd3, 0x31, 0x1f, 0x54, 0x7c, 0x8d, 0xe2, 0x30, 0xda, 0xdc, 0x8d, 0xb6, 0x20, 0xfe,
- 0x0a, 0xc8, 0xc1, 0x32, 0x34, 0x98, 0x30, 0x45, 0x1f, 0x47, 0x4b, 0x09, 0x69, 0x78, 0xa7, 0xda,
- 0xdd, 0xe9, 0xb5, 0xcd, 0x86, 0xa6, 0x8b, 0x7d, 0xe9, 0xfd, 0xbe, 0x12, 0x7f, 0x45, 0xb8, 0x86,
- 0x4a, 0x71, 0x17, 0xd8, 0x2b, 0x2e, 0x4b, 0x25, 0x4c, 0xf2, 0x26, 0x9b, 0xe6, 0xa7, 0x74, 0x1e,
- 0xc8, 0xbd, 0x03, 0x0f, 0xc2, 0xe0, 0x07, 0x84, 0x6b, 0x66, 0x6c, 0xb9, 0x7b, 0xb7, 0xad, 0x56,
- 0x9c, 0xb9, 0xee, 0x33, 0xd7, 0x3d, 0xfd, 0xaf, 0xfb, 0xe3, 0xf0, 0x4a, 0xf4, 0x62, 0x57, 0x75,
- 0x37, 0x2e, 0x5a, 0x7e, 0x43, 0x6f, 0xc3, 0x96, 0x06, 0x7f, 0x41, 0x33, 0x65, 0x65, 0x63, 0x92,
- 0x28, 0x9b, 0xe6, 0x67, 0x37, 0xef, 0x8e, 0xae, 0x81, 0x7a, 0xba, 0x7e, 0x44, 0xef, 0xa1, 0xab,
- 0x08, 0xb4, 0x52, 0x73, 0xe8, 0x44, 0x70, 0xad, 0xcf, 0x7f, 0xb8, 0x6f, 0x30, 0xff, 0x24, 0x95,
- 0xb2, 0xcf, 0xdb, 0x0d, 0xe1, 0xd0, 0x14, 0x83, 0xb5, 0xf0, 0xd6, 0xeb, 0xf0, 0x6b, 0xf5, 0xb7,
- 0x45, 0x05, 0x41, 0xdb, 0xc4, 0x4e, 0xbc, 0xfd, 0x17, 0x00, 0x00, 0xff, 0xff, 0x4e, 0xc5, 0xdf,
- 0x09, 0xdb, 0x02, 0x00, 0x00,
+ // 463 bytes of a gzipped FileDescriptorProto
+ 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x52, 0xdd, 0x6e, 0xd3, 0x30,
+ 0x14, 0x56, 0xda, 0x35, 0x6d, 0x3d, 0x15, 0x3a, 0x43, 0x91, 0x09, 0x9a, 0x16, 0x55, 0x80, 0x72,
+ 0xc1, 0x12, 0xb1, 0xbd, 0x00, 0xed, 0x76, 0xb3, 0x5b, 0x6b, 0xe2, 0x82, 0x9b, 0xca, 0xb5, 0xbd,
+ 0xcc, 0x52, 0xe2, 0x13, 0xc5, 0x6e, 0xa4, 0x3e, 0x24, 0x2f, 0xc0, 0x13, 0xf0, 0x04, 0x5c, 0xa3,
+ 0x3a, 0x0e, 0xfd, 0x41, 0xda, 0x55, 0x72, 0xbe, 0xef, 0x3b, 0xdf, 0xf9, 0x33, 0xfa, 0xd0, 0x40,
+ 0x61, 0x9f, 0xd9, 0xaa, 0xaa, 0xc1, 0x82, 0xc9, 0x98, 0x60, 0x95, 0x95, 0x75, 0xea, 0x42, 0x1c,
+ 0xb6, 0x64, 0xf4, 0x3e, 0x07, 0xc8, 0x0b, 0x99, 0x39, 0x74, 0xbd, 0x79, 0xca, 0x98, 0xde, 0xb6,
+ 0x92, 0x28, 0x3a, 0xce, 0xe7, 0x50, 0x96, 0xa0, 0x3d, 0x47, 0x8e, 0xb9, 0x52, 0x5a, 0xe6, 0x99,
+ 0xab, 0x53, 0x43, 0xab, 0x4a, 0x69, 0x2c, 0x2b, 0xab, 0x56, 0x30, 0xa7, 0x68, 0xb2, 0x68, 0x5b,
+ 0xb9, 0x03, 0xfd, 0xa4, 0x72, 0xbc, 0x40, 0x17, 0x4c, 0x08, 0x65, 0x15, 0x68, 0x56, 0xac, 0xb8,
+ 0x03, 0xc9, 0xb7, 0x38, 0x48, 0xce, 0x6f, 0xde, 0xa6, 0xad, 0x5b, 0xda, 0xb9, 0xa5, 0x0b, 0xbd,
+ 0xa5, 0xd3, 0xbd, 0xbc, 0xb5, 0x98, 0xff, 0xea, 0xa3, 0xa1, 0x37, 0xc5, 0x33, 0xd4, 0x53, 0x82,
+ 0x04, 0x71, 0x90, 0x8c, 0x97, 0x83, 0xdf, 0x7f, 0x7e, 0x5e, 0x06, 0xb4, 0xa7, 0x04, 0xbe, 0x44,
+ 0x61, 0x23, 0xb5, 0x80, 0x9a, 0xf4, 0x0e, 0x29, 0x0f, 0xe2, 0x2b, 0x34, 0x6c, 0x64, 0x6d, 0x14,
+ 0x68, 0xd2, 0x3f, 0xe4, 0x3b, 0x14, 0x5f, 0xa3, 0xd0, 0xb7, 0x36, 0x75, 0xad, 0xcd, 0xd2, 0x76,
+ 0x05, 0xe9, 0xd1, 0x30, 0xd4, 0x8b, 0x30, 0x45, 0xef, 0x0e, 0x86, 0x12, 0xd2, 0xf0, 0x5a, 0x55,
+ 0xbb, 0xe8, 0xa5, 0xc9, 0xba, 0xa2, 0xb3, 0x7d, 0xea, 0xfd, 0x3e, 0x13, 0x7f, 0x41, 0xb8, 0x80,
+ 0x5c, 0x71, 0x67, 0xd8, 0x28, 0x2e, 0x57, 0x4a, 0x18, 0x72, 0x16, 0xf7, 0x93, 0x31, 0x9d, 0x7a,
+ 0xe6, 0xde, 0x11, 0x0f, 0xc2, 0xe0, 0x07, 0x84, 0x0b, 0x66, 0xec, 0x6a, 0x77, 0xb7, 0x8d, 0x56,
+ 0x9c, 0xb9, 0xea, 0x03, 0x57, 0x3d, 0xfa, 0xaf, 0xfa, 0x63, 0x77, 0x25, 0x7a, 0xb1, 0xcb, 0xba,
+ 0x3b, 0x4c, 0xc2, 0x9f, 0xd1, 0x2b, 0xbe, 0xa9, 0x6b, 0xa9, 0x2d, 0x95, 0x55, 0xa1, 0x38, 0x23,
+ 0x61, 0x1c, 0x24, 0x03, 0x7a, 0x82, 0xe2, 0x8f, 0x68, 0x62, 0xc1, 0xb2, 0xc2, 0xc7, 0x86, 0x0c,
+ 0x9d, 0xec, 0x18, 0xc4, 0x11, 0x1a, 0x49, 0x2d, 0x2a, 0x50, 0xda, 0x92, 0xd1, 0x6e, 0xd7, 0xf4,
+ 0x5f, 0x8c, 0x31, 0x3a, 0xb3, 0xdb, 0x4a, 0x92, 0xb1, 0xc3, 0xdd, 0xff, 0xfc, 0x2b, 0x1a, 0xf9,
+ 0x1d, 0x1b, 0xfc, 0x09, 0x0d, 0x94, 0x95, 0xa5, 0x21, 0x41, 0xdc, 0x4f, 0xce, 0x6f, 0x5e, 0x9f,
+ 0x1c, 0x81, 0xb6, 0xec, 0xf2, 0x11, 0xbd, 0x81, 0x3a, 0x4f, 0xa1, 0x92, 0x9a, 0x43, 0x2d, 0xbc,
+ 0x6a, 0x39, 0xf9, 0xee, 0xbe, 0x5e, 0xfc, 0x23, 0xcd, 0x95, 0x7d, 0xde, 0xac, 0x53, 0x0e, 0x65,
+ 0xd6, 0x49, 0xb3, 0x56, 0x7a, 0xed, 0x1f, 0x76, 0x73, 0x9b, 0xe5, 0xe0, 0xb1, 0x75, 0xe8, 0xc0,
+ 0xdb, 0xbf, 0x01, 0x00, 0x00, 0xff, 0xff, 0xef, 0x64, 0x5e, 0x10, 0x59, 0x03, 0x00, 0x00,
}
diff --git a/vendor/modules.txt b/vendor/modules.txt
index 5b3c29c..8bd422c 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -8,6 +8,10 @@
github.com/beorn7/perks/quantile
# github.com/bsm/sarama-cluster v2.1.15+incompatible
github.com/bsm/sarama-cluster
+# github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72
+github.com/buraksezer/consistent
+# github.com/cespare/xxhash v1.1.0
+github.com/cespare/xxhash
# github.com/cevaris/ordered_map v0.0.0-20190319150403-3adeae072e73
github.com/cevaris/ordered_map
# github.com/coreos/go-semver v0.2.0
@@ -95,7 +99,7 @@
github.com/modern-go/concurrent
# github.com/modern-go/reflect2 v1.0.1
github.com/modern-go/reflect2
-# github.com/opencord/voltha-lib-go/v3 v3.1.0
+# github.com/opencord/voltha-lib-go/v3 v3.1.2
github.com/opencord/voltha-lib-go/v3/pkg/adapters
github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif
github.com/opencord/voltha-lib-go/v3/pkg/adapters/common
@@ -106,10 +110,11 @@
github.com/opencord/voltha-lib-go/v3/pkg/grpc
github.com/opencord/voltha-lib-go/v3/pkg/kafka
github.com/opencord/voltha-lib-go/v3/pkg/log
-github.com/opencord/voltha-lib-go/v3/pkg/mocks
+github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd
+github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka
github.com/opencord/voltha-lib-go/v3/pkg/probe
github.com/opencord/voltha-lib-go/v3/pkg/version
-# github.com/opencord/voltha-protos/v3 v3.2.8
+# github.com/opencord/voltha-protos/v3 v3.3.0
github.com/opencord/voltha-protos/v3/go/common
github.com/opencord/voltha-protos/v3/go/inter_container
github.com/opencord/voltha-protos/v3/go/omci