[VOL-2836] Using different topic per ONU device
Change-Id: Ic0328f789c19dfedd86616df527c5ba510de0de9
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/adapter_proxy.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/adapter_proxy.go
index 02fa3de..bbae0ed 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/adapter_proxy.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/adapter_proxy.go
@@ -17,6 +17,7 @@
import (
"context"
+ "github.com/opencord/voltha-lib-go/v3/pkg/db"
"time"
"github.com/golang/protobuf/proto"
@@ -32,14 +33,17 @@
kafkaICProxy kafka.InterContainerProxy
adapterTopic string
coreTopic string
+ endpointMgr kafka.EndpointManager
}
-func NewAdapterProxy(kafkaProxy kafka.InterContainerProxy, adapterTopic string, coreTopic string) *AdapterProxy {
- var proxy AdapterProxy
- proxy.kafkaICProxy = kafkaProxy
- proxy.adapterTopic = adapterTopic
- proxy.coreTopic = coreTopic
- logger.Debugw("TOPICS", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
+func NewAdapterProxy(kafkaProxy kafka.InterContainerProxy, adapterTopic string, coreTopic string, backend *db.Backend) *AdapterProxy {
+ proxy := AdapterProxy{
+ kafkaICProxy: kafkaProxy,
+ adapterTopic: adapterTopic,
+ coreTopic: coreTopic,
+ endpointMgr: kafka.NewEndpointManager(backend),
+ }
+ logger.Debugw("topics", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
return &proxy
}
@@ -87,7 +91,11 @@
}
// Set up the required rpc arguments
- topic := kafka.Topic{Name: toAdapter}
+ endpoint, err := ap.endpointMgr.GetEndpoint(toDeviceId, toAdapter)
+ if err != nil {
+ return err
+ }
+ topic := kafka.Topic{Name: string(endpoint)}
replyToTopic := kafka.Topic{Name: fromAdapter}
rpc := "process_inter_adapter_message"
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/core_proxy.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/core_proxy.go
index 9582f33..20e1a52 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/core_proxy.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/core_proxy.go
@@ -99,6 +99,28 @@
topic := kafka.Topic{Name: ap.coreTopic}
replyToTopic := ap.getAdapterTopic()
args := make([]*kafka.KVArg, 2)
+
+ if adapter.TotalReplicas == 0 && adapter.CurrentReplica != 0 {
+ log.Fatal("totalReplicas can't be 0, since you're here you have at least one")
+ }
+
+ if adapter.CurrentReplica == 0 && adapter.TotalReplicas != 0 {
+ log.Fatal("currentReplica can't be 0, it has to start from 1")
+ }
+
+ if adapter.CurrentReplica == 0 && adapter.TotalReplicas == 0 {
+ // if the adapter is not setting these fields they default to 0,
+ // in that case it means the adapter is not ready to be scaled and thus it defaults
+ // to a single instance
+ adapter.CurrentReplica = 1
+ adapter.TotalReplicas = 1
+ }
+
+ if adapter.CurrentReplica > adapter.TotalReplicas {
+ log.Fatalf("CurrentReplica (%d) can't be greater than TotalReplicas (%d)",
+ adapter.CurrentReplica, adapter.TotalReplicas)
+ }
+
args[0] = &kafka.KVArg{
Key: "adapter",
Value: adapter,
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/configmanager.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/configmanager.go
index 9f08b0d..724ad32 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/configmanager.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/configmanager.go
@@ -63,10 +63,10 @@
ConfigAttribute string
}
-// ConfigManager is a wrapper over backend to maintain Configuration of voltha components
+// ConfigManager is a wrapper over Backend to maintain Configuration of voltha components
// in kvstore based persistent storage
type ConfigManager struct {
- backend *db.Backend
+ Backend *db.Backend
KvStoreConfigPrefix string
}
@@ -95,7 +95,7 @@
return &ConfigManager{
KvStoreConfigPrefix: defaultkvStoreConfigPath,
- backend: &db.Backend{
+ Backend: &db.Backend{
Client: kvClient,
StoreType: kvStoreType,
Host: kvStoreHost,
@@ -108,12 +108,12 @@
// RetrieveComponentList list the component Names for which loglevel is stored in kvstore
func (c *ConfigManager) RetrieveComponentList(ctx context.Context, configType ConfigType) ([]string, error) {
- data, err := c.backend.List(ctx, c.KvStoreConfigPrefix)
+ data, err := c.Backend.List(ctx, c.KvStoreConfigPrefix)
if err != nil {
return nil, err
}
- // Looping through the data recieved from the backend for config
+ // Looping through the data recieved from the Backend for config
// Trimming and Splitting the required key and value from data and storing as componentName,PackageName and Level
// For Example, recieved key would be <Backend Prefix Path>/<Config Prefix>/<Component Name>/<Config Type>/default and value \"DEBUG\"
// Then in default will be stored as PackageName,componentName as <Component Name> and DEBUG will be stored as value in List struct
@@ -168,14 +168,14 @@
c.changeEventChan = make(chan *ConfigChangeEvent, 1)
- c.kvStoreEventChan = c.cManager.backend.CreateWatch(ctx, key, true)
+ c.kvStoreEventChan = c.cManager.Backend.CreateWatch(ctx, key, true)
go c.processKVStoreWatchEvents()
return c.changeEventChan
}
-// processKVStoreWatchEvents process event channel recieved from the backend for any ChangeType
+// processKVStoreWatchEvents process event channel recieved from the Backend for any ChangeType
// It checks for the EventType is valid or not.For the valid EventTypes creates ConfigChangeEvent and send it on channel
func (c *ComponentConfig) processKVStoreWatchEvents() {
@@ -183,7 +183,7 @@
logger.Debugw("processing-kvstore-event-change", log.Fields{"key-prefix": ccKeyPrefix})
- ccPathPrefix := c.cManager.backend.PathPrefix + ccKeyPrefix + kvStorePathSeparator
+ ccPathPrefix := c.cManager.Backend.PathPrefix + ccKeyPrefix + kvStorePathSeparator
for watchResp := range c.kvStoreEventChan {
@@ -210,7 +210,7 @@
logger.Debugw("retrieving-config", log.Fields{"key": key})
- if kvpair, err := c.cManager.backend.Get(ctx, key); err != nil {
+ if kvpair, err := c.cManager.Backend.Get(ctx, key); err != nil {
return "", err
} else {
if kvpair == nil {
@@ -228,17 +228,17 @@
logger.Debugw("retreiving-list", log.Fields{"key": key})
- data, err := c.cManager.backend.List(ctx, key)
+ data, err := c.cManager.Backend.List(ctx, key)
if err != nil {
return nil, err
}
- // Looping through the data recieved from the backend for the given key
+ // Looping through the data recieved from the Backend for the given key
// Trimming the required key and value from data and storing as key/value pair
// For Example, recieved key would be <Backend Prefix Path>/<Config Prefix>/<Component Name>/<Config Type>/default and value \"DEBUG\"
// Then in default will be stored as key and DEBUG will be stored as value in map[string]string
res := make(map[string]string)
- ccPathPrefix := c.cManager.backend.PathPrefix + kvStorePathSeparator + key + kvStorePathSeparator
+ ccPathPrefix := c.cManager.Backend.PathPrefix + kvStorePathSeparator + key + kvStorePathSeparator
for attr, val := range data {
res[strings.TrimPrefix(attr, ccPathPrefix)] = strings.Trim(fmt.Sprintf("%s", val.Value), "\"")
}
@@ -252,7 +252,7 @@
logger.Debugw("saving-config", log.Fields{"key": key, "value": configValue})
//save the data for update config
- if err := c.cManager.backend.Put(ctx, key, configValue); err != nil {
+ if err := c.cManager.Backend.Put(ctx, key, configValue); err != nil {
return err
}
return nil
@@ -264,7 +264,7 @@
logger.Debugw("deleting-config", log.Fields{"key": key})
//delete the config
- if err := c.cManager.backend.Delete(ctx, key); err != nil {
+ if err := c.cManager.Backend.Delete(ctx, key); err != nil {
return err
}
return nil
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/logcontroller.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/logcontroller.go
index b929c9d..9c36241 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/logcontroller.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/logcontroller.go
@@ -15,7 +15,7 @@
*/
// Package Config provides dynamic logging configuration for specific Voltha component with loglevel lookup
-// from etcd kvstore implemented using backend.
+// from etcd kvstore implemented using Backend.
// Any Voltha component can start utilizing dynamic logging by starting goroutine of StartLogLevelConfigProcessing after
// starting kvClient for the component.
@@ -121,8 +121,8 @@
}
// ProcessLogConfig will first load and apply log config and then start waiting on component config and global config
-// channels for any changes. Event channel will be recieved from backend for valid change type
-// Then data for componentn log config and global log config will be retrieved from backend and stored in updatedLogConfig in precedence order
+// channels for any changes. Event channel will be recieved from Backend for valid change type
+// Then data for componentn log config and global log config will be retrieved from Backend and stored in updatedLogConfig in precedence order
// If any changes in updatedLogConfig will be applied on component
func (c *ComponentLogController) processLogConfig(ctx context.Context) {
@@ -247,7 +247,7 @@
return componentLogConfig, nil
}
-// buildUpdatedLogConfig retrieve the global logConfig and component logConfig from backend
+// buildUpdatedLogConfig retrieve the global logConfig and component logConfig from Backend
// component logConfig stores the log config with precedence order
// For example, If the global logConfig is set and component logConfig is set only for specific package then
// component logConfig is stored with global logConfig and component logConfig of specific package
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/endpoint_manager.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/endpoint_manager.go
new file mode 100644
index 0000000..4c13c76
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/endpoint_manager.go
@@ -0,0 +1,352 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+ "context"
+ "fmt"
+ "github.com/buraksezer/consistent"
+ "github.com/cespare/xxhash"
+ "github.com/golang/protobuf/proto"
+ "github.com/opencord/voltha-lib-go/v3/pkg/db"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "sync"
+)
+
+const (
+ // All the values below can be tuned to get optimal data distribution. The numbers below seems to work well when
+ // supporting 1000-10000 devices and 1 - 20 replicas of a service
+
+ // Keys are distributed among partitions. Prime numbers are good to distribute keys uniformly.
+ DefaultPartitionCount = 1117
+
+ // Represents how many times a node is replicated on the consistent ring.
+ DefaultReplicationFactor = 117
+
+ // Load is used to calculate average load.
+ DefaultLoad = 1.1
+)
+
+type Endpoint string // Endpoint of a service instance. When using kafka, this is the topic name of a service
+type ReplicaID int32 // The replication ID of a service instance
+
+type EndpointManager interface {
+
+ // GetEndpoint is called to get the endpoint to communicate with for a specific device and service type. For
+ // now this will return the topic name
+ GetEndpoint(deviceID string, serviceType string) (Endpoint, error)
+
+ // IsDeviceOwnedByService is invoked when a specific service (service type + replicaNumber) is restarted and
+ // devices owned by that service need to be reconciled
+ IsDeviceOwnedByService(deviceID string, serviceType string, replicaNumber int32) (bool, error)
+
+ // getReplicaAssignment returns the replica number of the service that owns the deviceID. This is used by the
+ // test only
+ getReplicaAssignment(deviceID string, serviceType string) (ReplicaID, error)
+}
+
+type service struct {
+ id string // Id of the service. The same id is used for all replicas
+ totalReplicas int32
+ replicas map[ReplicaID]Endpoint
+ consistentRing *consistent.Consistent
+}
+
+type endpointManager struct {
+ partitionCount int
+ replicationFactor int
+ load float64
+ backend *db.Backend
+ services map[string]*service
+ servicesLock sync.RWMutex
+ deviceTypeServiceMap map[string]string
+ deviceTypeServiceMapLock sync.RWMutex
+}
+
+type EndpointManagerOption func(*endpointManager)
+
+func PartitionCount(count int) EndpointManagerOption {
+ return func(args *endpointManager) {
+ args.partitionCount = count
+ }
+}
+
+func ReplicationFactor(replicas int) EndpointManagerOption {
+ return func(args *endpointManager) {
+ args.replicationFactor = replicas
+ }
+}
+
+func Load(load float64) EndpointManagerOption {
+ return func(args *endpointManager) {
+ args.load = load
+ }
+}
+
+func newEndpointManager(backend *db.Backend, opts ...EndpointManagerOption) EndpointManager {
+ tm := &endpointManager{
+ partitionCount: DefaultPartitionCount,
+ replicationFactor: DefaultReplicationFactor,
+ load: DefaultLoad,
+ backend: backend,
+ services: make(map[string]*service),
+ deviceTypeServiceMap: make(map[string]string),
+ }
+
+ for _, option := range opts {
+ option(tm)
+ }
+ return tm
+}
+
+func NewEndpointManager(backend *db.Backend, opts ...EndpointManagerOption) EndpointManager {
+ return newEndpointManager(backend, opts...)
+}
+
+func (ep *endpointManager) GetEndpoint(deviceID string, serviceType string) (Endpoint, error) {
+ logger.Debugw("getting-endpoint", log.Fields{"device-id": deviceID, "service": serviceType})
+ owner, err := ep.getOwner(deviceID, serviceType)
+ if err != nil {
+ return "", err
+ }
+ m, ok := owner.(Member)
+ if !ok {
+ return "", status.Errorf(codes.Aborted, "invalid-member-%v", owner)
+ }
+ endpoint := m.getEndPoint()
+ if endpoint == "" {
+ return "", status.Errorf(codes.Unavailable, "endpoint-not-set-%s", serviceType)
+ }
+ logger.Debugw("returning-endpoint", log.Fields{"device-id": deviceID, "service": serviceType, "endpoint": endpoint})
+ return endpoint, nil
+}
+
+func (ep *endpointManager) IsDeviceOwnedByService(deviceID string, serviceType string, replicaNumber int32) (bool, error) {
+ logger.Debugw("device-ownership", log.Fields{"device-id": deviceID, "service": serviceType, "replica-number": replicaNumber})
+ owner, err := ep.getOwner(deviceID, serviceType)
+ if err != nil {
+ return false, nil
+ }
+ m, ok := owner.(Member)
+ if !ok {
+ return false, status.Errorf(codes.Aborted, "invalid-member-%v", owner)
+ }
+ return m.getReplica() == ReplicaID(replicaNumber), nil
+}
+
+func (ep *endpointManager) getReplicaAssignment(deviceID string, serviceType string) (ReplicaID, error) {
+ owner, err := ep.getOwner(deviceID, serviceType)
+ if err != nil {
+ return 0, nil
+ }
+ m, ok := owner.(Member)
+ if !ok {
+ return 0, status.Errorf(codes.Aborted, "invalid-member-%v", owner)
+ }
+ return m.getReplica(), nil
+}
+
+func (ep *endpointManager) getOwner(deviceID string, serviceType string) (consistent.Member, error) {
+ serv, dType, err := ep.getServiceAndDeviceType(serviceType)
+ if err != nil {
+ return nil, err
+ }
+ key := ep.makeKey(deviceID, dType, serviceType)
+ return serv.consistentRing.LocateKey(key), nil
+}
+
+func (ep *endpointManager) getServiceAndDeviceType(serviceType string) (*service, string, error) {
+ // Check whether service exist
+ ep.servicesLock.RLock()
+ serv, serviceExist := ep.services[serviceType]
+ ep.servicesLock.RUnlock()
+
+ // Load the service and device types if needed
+ if !serviceExist || serv == nil || int(serv.totalReplicas) != len(serv.consistentRing.GetMembers()) {
+ if err := ep.loadServices(); err != nil {
+ return nil, "", err
+ }
+
+ // Check whether the service exists now
+ ep.servicesLock.RLock()
+ serv, serviceExist = ep.services[serviceType]
+ ep.servicesLock.RUnlock()
+ if !serviceExist || serv == nil || int(serv.totalReplicas) != len(serv.consistentRing.GetMembers()) {
+ return nil, "", status.Errorf(codes.NotFound, "service-%s", serviceType)
+ }
+ }
+
+ ep.deviceTypeServiceMapLock.RLock()
+ defer ep.deviceTypeServiceMapLock.RUnlock()
+ for dType, sType := range ep.deviceTypeServiceMap {
+ if sType == serviceType {
+ return serv, dType, nil
+ }
+ }
+ return nil, "", status.Errorf(codes.NotFound, "service-%s", serviceType)
+}
+
+func (ep *endpointManager) getConsistentConfig() consistent.Config {
+ return consistent.Config{
+ PartitionCount: ep.partitionCount,
+ ReplicationFactor: ep.replicationFactor,
+ Load: ep.load,
+ Hasher: hasher{},
+ }
+}
+
+// loadServices loads the services (adapters) and device types in memory. Because of the small size of the data and
+// the data format in the dB being binary protobuf then it is better to load all the data if inconsistency is detected,
+// instead of watching for updates in the dB and acting on it.
+func (ep *endpointManager) loadServices() error {
+ ep.servicesLock.Lock()
+ defer ep.servicesLock.Unlock()
+ ep.deviceTypeServiceMapLock.Lock()
+ defer ep.deviceTypeServiceMapLock.Unlock()
+
+ if ep.backend == nil {
+ return status.Error(codes.Aborted, "backend-not-set")
+ }
+ ep.services = make(map[string]*service)
+ ep.deviceTypeServiceMap = make(map[string]string)
+
+ // Load the adapters
+ blobs, err := ep.backend.List(context.Background(), "adapters")
+ if err != nil {
+ return err
+ }
+
+ // Data is marshalled as proto bytes in the data store
+ for _, blob := range blobs {
+ data := blob.Value.([]byte)
+ adapter := &voltha.Adapter{}
+ if err := proto.Unmarshal(data, adapter); err != nil {
+ return err
+ }
+ // A valid adapter should have the vendorID set
+ if adapter.Vendor != "" {
+ if _, ok := ep.services[adapter.Type]; !ok {
+ ep.services[adapter.Type] = &service{
+ id: adapter.Type,
+ totalReplicas: adapter.TotalReplicas,
+ replicas: make(map[ReplicaID]Endpoint),
+ consistentRing: consistent.New(nil, ep.getConsistentConfig()),
+ }
+
+ }
+ currentReplica := ReplicaID(adapter.CurrentReplica)
+ endpoint := Endpoint(adapter.Endpoint)
+ ep.services[adapter.Type].replicas[currentReplica] = endpoint
+ ep.services[adapter.Type].consistentRing.Add(newMember(adapter.Id, adapter.Type, adapter.Vendor, endpoint, adapter.Version, currentReplica))
+ }
+ }
+ // Load the device types
+ blobs, err = ep.backend.List(context.Background(), "device_types")
+ if err != nil {
+ return err
+ }
+ for _, blob := range blobs {
+ data := blob.Value.([]byte)
+ deviceType := &voltha.DeviceType{}
+ if err := proto.Unmarshal(data, deviceType); err != nil {
+ return err
+ }
+ if _, ok := ep.deviceTypeServiceMap[deviceType.Id]; !ok {
+ ep.deviceTypeServiceMap[deviceType.Id] = deviceType.Adapter
+ }
+ }
+
+ // Log the loaded data in debug mode to facilitate trouble shooting
+ if logger.V(log.DebugLevel) {
+ for key, val := range ep.services {
+ members := val.consistentRing.GetMembers()
+ logger.Debugw("service", log.Fields{"service": key, "expected-replica": val.totalReplicas, "replicas": len(val.consistentRing.GetMembers())})
+ for _, m := range members {
+ n := m.(Member)
+ logger.Debugw("service-loaded", log.Fields{"serviceId": n.getID(), "serviceType": n.getServiceType(), "replica": n.getReplica(), "endpoint": n.getEndPoint()})
+ }
+ }
+ logger.Debugw("device-types-loaded", log.Fields{"device-types": ep.deviceTypeServiceMap})
+ }
+ return nil
+}
+
+// makeKey creates the string that the hash function uses to create the hash
+func (ep *endpointManager) makeKey(deviceID string, deviceType string, serviceType string) []byte {
+ return []byte(fmt.Sprintf("%s_%s_%s", serviceType, deviceType, deviceID))
+}
+
+// The consistent package requires a hasher function
+type hasher struct{}
+
+// Sum64 provides the hasher function. Based upon numerous testing scenarios, the xxhash package seems to provide the
+// best distribution compare to other hash packages
+func (h hasher) Sum64(data []byte) uint64 {
+ return xxhash.Sum64(data)
+}
+
+// Member represents a member on the consistent ring
+type Member interface {
+ String() string
+ getReplica() ReplicaID
+ getEndPoint() Endpoint
+ getID() string
+ getServiceType() string
+}
+
+// member implements the Member interface
+type member struct {
+ id string
+ serviceType string
+ vendor string
+ version string
+ replica ReplicaID
+ endpoint Endpoint
+}
+
+func newMember(ID string, serviceType string, vendor string, endPoint Endpoint, version string, replica ReplicaID) Member {
+ return &member{
+ id: ID,
+ serviceType: serviceType,
+ vendor: vendor,
+ version: version,
+ replica: replica,
+ endpoint: endPoint,
+ }
+}
+
+func (m *member) String() string {
+ return string(m.endpoint)
+}
+
+func (m *member) getReplica() ReplicaID {
+ return m.replica
+}
+
+func (m *member) getEndPoint() Endpoint {
+ return m.endpoint
+}
+
+func (m *member) getID() string {
+ return m.id
+}
+
+func (m *member) getServiceType() string {
+ return m.serviceType
+}