VOL-2999 - Reworked how Proxies are created & used.
Added DB Paths to separate location specification logic from entry access logic.
Also merged Update() and AddWithID() and renamed to Set().
Change-Id: I9ed5eafd63c180dddc5845a166554f89bda12325
diff --git a/db/model/proxy.go b/db/model/proxy.go
index 997ebe4..cd35507 100644
--- a/db/model/proxy.go
+++ b/db/model/proxy.go
@@ -20,10 +20,12 @@
"context"
"errors"
"fmt"
+ "reflect"
+ "strings"
+
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-lib-go/v3/pkg/db"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
- "reflect"
)
// RequestTimestamp attribute used to store a timestamp in the context object
@@ -31,30 +33,43 @@
type contextKey string
-// Proxy holds the information for a specific location with the data model
-type Proxy struct {
+// Path holds the information for a specific location within the data model
+type Path struct {
kvStore *db.Backend
path string
}
-// NewProxy instantiates a new proxy to a specific location
-func NewProxy(kvStore *db.Backend, path string) *Proxy {
- if path == "/" {
- path = ""
- }
- return &Proxy{
- kvStore: kvStore,
- path: path,
+// NewDBPath returns a path to the default db location
+func NewDBPath(kvStore *db.Backend) *Path {
+ return &Path{kvStore: kvStore}
+}
+
+// SubPath returns a path which points to a more specific db location
+func (p *Path) SubPath(path string) *Path {
+ path = strings.TrimRight(strings.TrimLeft(path, "/"), "/")
+ return &Path{
+ kvStore: p.kvStore,
+ path: p.path + path + "/",
}
}
-// List will retrieve information from the data model at the specified path location, and write it to the target slice
-// target must be a type of the form *[]<proto.Message Type> For example: *[]*voltha.Device
-func (p *Proxy) List(ctx context.Context, path string, target interface{}) error {
- completePath := p.path + path
+// Proxy contains all the information needed to reference a specific resource within the kv
+type Proxy Path
+// Proxy returns a new proxy which references the specified resource
+func (p *Path) Proxy(resource string) *Proxy {
+ resource = strings.TrimRight(strings.TrimLeft(resource, "/"), "/")
+ return &Proxy{
+ kvStore: p.kvStore,
+ path: p.path + resource + "/",
+ }
+}
+
+// List will retrieve information from the data model at the proxy's path location, and write it to the target slice
+// target must be a type of the form *[]<proto.Message Type> For example: *[]*voltha.Device
+func (p *Proxy) List(ctx context.Context, target interface{}) error {
logger.Debugw("proxy-list", log.Fields{
- "path": completePath,
+ "path": p.path,
})
// verify type of target is *[]*<type>
@@ -72,13 +87,13 @@
}
dataType := elemType.Elem() // type
- blobs, err := p.kvStore.List(ctx, completePath)
+ blobs, err := p.kvStore.List(ctx, p.path)
if err != nil {
- return fmt.Errorf("failed to retrieve %s from kvstore: %s", path, err)
+ return fmt.Errorf("failed to retrieve %s from kvstore: %s", p.path, err)
}
logger.Debugw("parsing-data-blobs", log.Fields{
- "path": path,
+ "path": p.path,
"size": len(blobs),
})
@@ -96,9 +111,9 @@
return nil
}
-// Get will retrieve information from the data model at the specified path location, and write it to target
-func (p *Proxy) Get(ctx context.Context, path string, target proto.Message) (bool, error) {
- completePath := p.path + path
+// Get will retrieve information from the data model at the proxy's path location, and write it to target
+func (p *Proxy) Get(ctx context.Context, id string, target proto.Message) (bool, error) {
+ completePath := p.path + id
logger.Debugw("proxy-get", log.Fields{
"path": completePath,
@@ -106,13 +121,13 @@
blob, err := p.kvStore.Get(ctx, completePath)
if err != nil {
- return false, fmt.Errorf("failed to retrieve %s from kvstore: %s", path, err)
+ return false, fmt.Errorf("failed to retrieve %s from kvstore: %s", completePath, err)
} else if blob == nil {
return false, nil // this blob does not exist
}
logger.Debugw("parsing-data-blobs", log.Fields{
- "path": path,
+ "path": completePath,
})
if err := proto.Unmarshal(blob.Value.([]byte), target); err != nil {
@@ -121,20 +136,9 @@
return true, nil
}
-// Update will modify information in the data model at the specified location with the provided data
-func (p *Proxy) Update(ctx context.Context, path string, data proto.Message) error {
- return p.add(ctx, path, data)
-}
-
-// AddWithID will insert new data at specified location.
-// This method also allows the user to specify the ID.
-func (p *Proxy) AddWithID(ctx context.Context, path string, id string, data proto.Message) error {
- return p.add(ctx, path+"/"+id, data)
-}
-
-// add will insert new data at specified location.
-func (p *Proxy) add(ctx context.Context, path string, data proto.Message) error {
- completePath := p.path + path
+// Set will add new or update existing entry at the proxy's path location
+func (p *Proxy) Set(ctx context.Context, id string, data proto.Message) error {
+ completePath := p.path + id
logger.Debugw("proxy-add", log.Fields{
"path": completePath,
@@ -151,9 +155,9 @@
return nil
}
-// Remove will delete an entry at the specified location
-func (p *Proxy) Remove(ctx context.Context, path string) error {
- completePath := p.path + path
+// Remove will delete an entry at the proxy's path location
+func (p *Proxy) Remove(ctx context.Context, id string) error {
+ completePath := p.path + id
logger.Debugw("proxy-remove", log.Fields{
"path": completePath,
diff --git a/db/model/proxy_test.go b/db/model/proxy_test.go
index 683e0a4..a73a071 100644
--- a/db/model/proxy_test.go
+++ b/db/model/proxy_test.go
@@ -19,6 +19,11 @@
"context"
"encoding/hex"
"encoding/json"
+ "strconv"
+ "strings"
+ "sync"
+ "testing"
+
"github.com/google/uuid"
"github.com/opencord/voltha-lib-go/v3/pkg/db"
"github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
@@ -27,10 +32,6 @@
"github.com/opencord/voltha-protos/v3/go/openflow_13"
"github.com/opencord/voltha-protos/v3/go/voltha"
"github.com/stretchr/testify/assert"
- "strconv"
- "strings"
- "sync"
- "testing"
)
var (
@@ -66,9 +67,10 @@
}
log.SetPackageLogLevel("github.com/opencord/voltha-go/db/model", log.DebugLevel)
- TestProxyRootLogicalDevice = NewProxy(mockBackend, "/")
- TestProxyRootDevice = NewProxy(mockBackend, "/")
- TestProxyRootAdapter = NewProxy(mockBackend, "/")
+ proxy := NewDBPath(mockBackend)
+ TestProxyRootLogicalDevice = proxy.Proxy("logical_devices")
+ TestProxyRootDevice = proxy.Proxy("device")
+ TestProxyRootAdapter = proxy.Proxy("adapters")
TestProxyLogicalPorts = []*voltha.LogicalPort{
{
@@ -169,7 +171,7 @@
TestProxyDeviceID = "0001" + hex.EncodeToString(devIDBin)[:12]
TestProxyDevice.Id = TestProxyDeviceID
- if err := TestProxyRootDevice.AddWithID(context.Background(), "devices", TestProxyDeviceID, TestProxyDevice); err != nil {
+ if err := TestProxyRootDevice.Set(context.Background(), TestProxyDeviceID, TestProxyDevice); err != nil {
BenchmarkProxyLogger.Errorf("Failed to add test proxy device due to error: %v", err)
t.Errorf("failed to add device: %s", err)
}
@@ -177,7 +179,7 @@
// Verify that the added device can now be retrieved
d := &voltha.Device{}
- if have, err := TestProxyRootDevice.Get(context.Background(), "devices/"+TestProxyDeviceID, d); err != nil {
+ if have, err := TestProxyRootDevice.Get(context.Background(), TestProxyDeviceID, d); err != nil {
BenchmarkProxyLogger.Errorf("Failed get device info from test proxy due to error: %v", err)
assert.NotNil(t, err)
} else if !have {
@@ -191,13 +193,13 @@
func TestProxy_1_1_2_Add_ExistingDevice(t *testing.T) {
TestProxyDevice.Id = TestProxyDeviceID
- if err := TestProxyRootDevice.add(context.Background(), "devices", TestProxyDevice); err != nil {
+ if err := TestProxyRootDevice.Set(context.Background(), "devices", TestProxyDevice); err != nil {
BenchmarkProxyLogger.Errorf("Failed to add device to test proxy due to error: %v", err)
assert.NotNil(t, err)
}
d := &voltha.Device{}
- if have, err := TestProxyRootDevice.Get(context.Background(), "devices/"+TestProxyDeviceID, d); err != nil {
+ if have, err := TestProxyRootDevice.Get(context.Background(), TestProxyDeviceID, d); err != nil {
BenchmarkProxyLogger.Errorf("Failed get device info from test proxy due to error: %v", err)
assert.NotNil(t, err)
} else if !have {
@@ -217,7 +219,7 @@
TestProxyAdapter.Id = TestProxyAdapterID
// Add the adapter
- if err := TestProxyRootAdapter.AddWithID(context.Background(), "adapters", TestProxyAdapterID, TestProxyAdapter); err != nil {
+ if err := TestProxyRootAdapter.Set(context.Background(), TestProxyAdapterID, TestProxyAdapter); err != nil {
BenchmarkProxyLogger.Errorf("Failed to add adapter to test proxy due to error: %v", err)
assert.NotNil(t, err)
} else {
@@ -226,7 +228,7 @@
// Verify that the added device can now be retrieved
d := &voltha.Device{}
- if have, err := TestProxyRootAdapter.Get(context.Background(), "adapters/"+TestProxyAdapterID, d); err != nil {
+ if have, err := TestProxyRootAdapter.Get(context.Background(), TestProxyAdapterID, d); err != nil {
BenchmarkProxyLogger.Errorf("Failed to retrieve device info from test proxy due to error: %v", err)
assert.NotNil(t, err)
} else if !have {
@@ -240,7 +242,7 @@
func TestProxy_1_2_1_Get_AllDevices(t *testing.T) {
var devices []*voltha.Device
- if err := TestProxyRootDevice.List(context.Background(), "devices", &devices); err != nil {
+ if err := TestProxyRootDevice.List(context.Background(), &devices); err != nil {
BenchmarkProxyLogger.Errorf("Failed to get all devices info from test proxy due to error: %v", err)
assert.NotNil(t, err)
}
@@ -255,7 +257,7 @@
func TestProxy_1_2_2_Get_SingleDevice(t *testing.T) {
d := &voltha.Device{}
- if have, err := TestProxyRootDevice.Get(context.Background(), "devices/"+TestProxyTargetDeviceID, d); err != nil {
+ if have, err := TestProxyRootDevice.Get(context.Background(), TestProxyTargetDeviceID, d); err != nil {
BenchmarkProxyLogger.Errorf("Failed to get single device info from test proxy due to error: %v", err)
assert.NotNil(t, err)
} else if !have {
@@ -270,7 +272,7 @@
var fwVersion int
retrieved := &voltha.Device{}
- if have, err := TestProxyRootDevice.Get(context.Background(), "devices/"+TestProxyTargetDeviceID, retrieved); err != nil {
+ if have, err := TestProxyRootDevice.Get(context.Background(), TestProxyTargetDeviceID, retrieved); err != nil {
BenchmarkProxyLogger.Errorf("Failed to get device info from test proxy due to error: %v", err)
assert.NotNil(t, err)
} else if !have {
@@ -287,12 +289,12 @@
retrieved.FirmwareVersion = strconv.Itoa(fwVersion)
- if err := TestProxyRootDevice.Update(context.Background(), "devices/"+TestProxyTargetDeviceID, retrieved); err != nil {
+ if err := TestProxyRootDevice.Set(context.Background(), TestProxyTargetDeviceID, retrieved); err != nil {
BenchmarkProxyLogger.Errorf("Failed to update device info test proxy due to error: %v", err)
assert.NotNil(t, err)
}
afterUpdate := &voltha.Device{}
- if have, err := TestProxyRootDevice.Get(context.Background(), "devices/"+TestProxyTargetDeviceID, afterUpdate); err != nil {
+ if have, err := TestProxyRootDevice.Get(context.Background(), TestProxyTargetDeviceID, afterUpdate); err != nil {
BenchmarkProxyLogger.Errorf("Failed to get device info from test proxy due to error: %v", err)
} else if !have {
t.Error("Failed to update device")
@@ -301,7 +303,7 @@
}
d := &voltha.Device{}
- if have, err := TestProxyRootDevice.Get(context.Background(), "devices/"+TestProxyTargetDeviceID, d); err != nil {
+ if have, err := TestProxyRootDevice.Get(context.Background(), TestProxyTargetDeviceID, d); err != nil {
BenchmarkProxyLogger.Errorf("Failed to get device info from test proxy due to error: %v", err)
assert.NotNil(t, err)
} else if !have {
@@ -315,10 +317,10 @@
func TestProxy_1_3_3_Update_Adapter(t *testing.T) {
- adaptersProxy := NewProxy(mockBackend, "/adapters")
+ adaptersProxy := NewDBPath(mockBackend).Proxy("adapters")
retrieved := &voltha.Adapter{}
- if have, err := TestProxyRootAdapter.Get(context.Background(), "adapters/"+TestProxyAdapterID, retrieved); err != nil {
+ if have, err := TestProxyRootAdapter.Get(context.Background(), TestProxyAdapterID, retrieved); err != nil {
BenchmarkProxyLogger.Errorf("Failed to retrieve adapter info from adapters proxy due to error: %v", err)
assert.NotNil(t, err)
} else if !have {
@@ -328,7 +330,7 @@
retrieved.Version = "test-adapter-version-2"
- if err := adaptersProxy.Update(context.Background(), TestProxyAdapterID, retrieved); err != nil {
+ if err := adaptersProxy.Set(context.Background(), TestProxyAdapterID, retrieved); err != nil {
BenchmarkProxyLogger.Errorf("Failed to update adapter info in adapters proxy due to error: %v", err)
assert.NotNil(t, err)
} else {
@@ -336,7 +338,7 @@
}
d := &voltha.Adapter{}
- if have, err := TestProxyRootAdapter.Get(context.Background(), "adapters/"+TestProxyAdapterID, d); err != nil {
+ if have, err := TestProxyRootAdapter.Get(context.Background(), TestProxyAdapterID, d); err != nil {
BenchmarkProxyLogger.Errorf("Failed to get updated adapter info from adapters proxy due to error: %v", err)
assert.NotNil(t, err)
} else if !have {
@@ -349,7 +351,7 @@
}
func TestProxy_1_4_1_Remove_Device(t *testing.T) {
- if err := TestProxyRootDevice.Remove(context.Background(), "devices/"+TestProxyDeviceID); err != nil {
+ if err := TestProxyRootDevice.Remove(context.Background(), TestProxyDeviceID); err != nil {
BenchmarkProxyLogger.Errorf("Failed to remove device from devices proxy due to error: %v", err)
t.Errorf("failed to remove device: %s", err)
} else {
@@ -357,7 +359,7 @@
}
d := &voltha.Device{}
- have, err := TestProxyRootDevice.Get(context.Background(), "devices/"+TestProxyDeviceID, d)
+ have, err := TestProxyRootDevice.Get(context.Background(), TestProxyDeviceID, d)
if err != nil {
BenchmarkProxyLogger.Errorf("Failed to get device info from devices proxy due to error: %v", err)
assert.NotNil(t, err)
@@ -375,7 +377,7 @@
TestProxyLogicalDeviceID = "0001" + hex.EncodeToString(ldIDBin)[:12]
TestProxyLogicalDevice.Id = TestProxyLogicalDeviceID
- if err := TestProxyRootLogicalDevice.AddWithID(context.Background(), "logical_devices", TestProxyLogicalDeviceID, TestProxyLogicalDevice); err != nil {
+ if err := TestProxyRootLogicalDevice.Set(context.Background(), TestProxyLogicalDeviceID, TestProxyLogicalDevice); err != nil {
BenchmarkProxyLogger.Errorf("Failed to add new logical device into proxy due to error: %v", err)
assert.NotNil(t, err)
} else {
@@ -383,7 +385,7 @@
}
ld := &voltha.LogicalDevice{}
- if have, err := TestProxyRootLogicalDevice.Get(context.Background(), "logical_devices/"+TestProxyLogicalDeviceID, ld); err != nil {
+ if have, err := TestProxyRootLogicalDevice.Get(context.Background(), TestProxyLogicalDeviceID, ld); err != nil {
BenchmarkProxyLogger.Errorf("Failed to get logical device info from logical device proxy due to error: %v", err)
assert.NotNil(t, err)
} else if !have {
@@ -397,7 +399,7 @@
func TestProxy_2_1_2_Add_ExistingLogicalDevice(t *testing.T) {
TestProxyLogicalDevice.Id = TestProxyLogicalDeviceID
- if err := TestProxyRootLogicalDevice.add(context.Background(), "logical_devices", TestProxyLogicalDevice); err != nil {
+ if err := TestProxyRootLogicalDevice.Set(context.Background(), "logical_devices", TestProxyLogicalDevice); err != nil {
BenchmarkProxyLogger.Errorf("Failed to add logical device due to error: %v", err)
assert.NotNil(t, err)
}
@@ -417,7 +419,7 @@
func TestProxy_2_2_1_Get_AllLogicalDevices(t *testing.T) {
var logicalDevices []*voltha.LogicalDevice
- if err := TestProxyRootLogicalDevice.List(context.Background(), "logical_devices", &logicalDevices); err != nil {
+ if err := TestProxyRootLogicalDevice.List(context.Background(), &logicalDevices); err != nil {
BenchmarkProxyLogger.Errorf("Failed to get all logical devices from proxy due to error: %v", err)
assert.NotNil(t, err)
}
@@ -432,7 +434,7 @@
func TestProxy_2_2_2_Get_SingleLogicalDevice(t *testing.T) {
ld := &voltha.LogicalDevice{}
- if have, err := TestProxyRootLogicalDevice.Get(context.Background(), "logical_devices/"+TestProxyTargetLogicalDeviceID, ld); err != nil {
+ if have, err := TestProxyRootLogicalDevice.Get(context.Background(), TestProxyTargetLogicalDeviceID, ld); err != nil {
BenchmarkProxyLogger.Errorf("Failed to get single logical device from proxy due to error: %v", err)
assert.NotNil(t, err)
} else if !have {
@@ -448,7 +450,7 @@
var fwVersion int
retrieved := &voltha.LogicalDevice{}
- if have, err := TestProxyRootLogicalDevice.Get(context.Background(), "logical_devices/"+TestProxyTargetLogicalDeviceID, retrieved); err != nil {
+ if have, err := TestProxyRootLogicalDevice.Get(context.Background(), TestProxyTargetLogicalDeviceID, retrieved); err != nil {
BenchmarkProxyLogger.Errorf("Failed to get logical devices due to error: %v", err)
assert.NotNil(t, err)
} else if !have {
@@ -465,7 +467,7 @@
retrieved.RootDeviceId = strconv.Itoa(fwVersion)
- if err := TestProxyRootLogicalDevice.Update(context.Background(), "logical_devices/"+TestProxyTargetLogicalDeviceID, retrieved); err != nil {
+ if err := TestProxyRootLogicalDevice.Set(context.Background(), TestProxyTargetLogicalDeviceID, retrieved); err != nil {
BenchmarkProxyLogger.Errorf("Faield to update logical device info due to error: %v", err)
assert.NotNil(t, err)
} else {
@@ -473,7 +475,7 @@
}
d := &voltha.LogicalDevice{}
- if have, err := TestProxyRootLogicalDevice.Get(context.Background(), "logical_devices/"+TestProxyTargetLogicalDeviceID, d); err != nil {
+ if have, err := TestProxyRootLogicalDevice.Get(context.Background(), TestProxyTargetLogicalDeviceID, d); err != nil {
BenchmarkProxyLogger.Errorf("Failed to get logical device info due to error: %v", err)
assert.NotNil(t, err)
} else if !have {
diff --git a/rw_core/core/adapter/manager.go b/rw_core/core/adapter/manager.go
index b552d8f..341624f 100644
--- a/rw_core/core/adapter/manager.go
+++ b/rw_core/core/adapter/manager.go
@@ -37,19 +37,21 @@
type Manager struct {
adapterAgents map[string]*agent
deviceTypes map[string]*voltha.DeviceType
- clusterDataProxy *model.Proxy
+ adapterProxy *model.Proxy
+ deviceTypeProxy *model.Proxy
onAdapterRestart adapterRestartedHandler
coreInstanceID string
lockAdaptersMap sync.RWMutex
lockdDeviceTypeToAdapterMap sync.RWMutex
}
-func NewAdapterManager(cdProxy *model.Proxy, coreInstanceID string, kafkaClient kafka.Client) *Manager {
+func NewAdapterManager(dbPath *model.Path, coreInstanceID string, kafkaClient kafka.Client) *Manager {
aMgr := &Manager{
- coreInstanceID: coreInstanceID,
- clusterDataProxy: cdProxy,
- deviceTypes: make(map[string]*voltha.DeviceType),
- adapterAgents: make(map[string]*agent),
+ coreInstanceID: coreInstanceID,
+ adapterProxy: dbPath.Proxy("adapters"),
+ deviceTypeProxy: dbPath.Proxy("device_types"),
+ deviceTypes: make(map[string]*voltha.DeviceType),
+ adapterAgents: make(map[string]*agent),
}
kafkaClient.SubscribeForMetadata(aMgr.updateLastAdapterCommunication)
return aMgr
@@ -82,7 +84,7 @@
func (aMgr *Manager) loadAdaptersAndDevicetypesInMemory() error {
// Load the adapters
var adapters []*voltha.Adapter
- if err := aMgr.clusterDataProxy.List(context.Background(), "adapters", &adapters); err != nil {
+ if err := aMgr.adapterProxy.List(context.Background(), &adapters); err != nil {
logger.Errorw("Failed-to-list-adapters-from-cluster-data-proxy", log.Fields{"error": err})
return err
}
@@ -98,7 +100,7 @@
// Load the device types
var deviceTypes []*voltha.DeviceType
- if err := aMgr.clusterDataProxy.List(context.Background(), "device_types", &deviceTypes); err != nil {
+ if err := aMgr.deviceTypeProxy.List(context.Background(), &deviceTypes); err != nil {
logger.Errorw("Failed-to-list-device-types-from-cluster-data-proxy", log.Fields{"error": err})
return err
}
@@ -134,11 +136,11 @@
if _, exist := aMgr.adapterAgents[adapter.Id]; !exist {
if saveToDb {
// Save the adapter to the KV store - first check if it already exist
- if have, err := aMgr.clusterDataProxy.Get(context.Background(), "adapters/"+adapter.Id, &voltha.Adapter{}); err != nil {
+ if have, err := aMgr.adapterProxy.Get(context.Background(), adapter.Id, &voltha.Adapter{}); err != nil {
logger.Errorw("failed-to-get-adapters-from-cluster-proxy", log.Fields{"error": err})
return err
} else if !have {
- if err := aMgr.clusterDataProxy.AddWithID(context.Background(), "adapters", adapter.Id, adapter); err != nil {
+ if err := aMgr.adapterProxy.Set(context.Background(), adapter.Id, adapter); err != nil {
logger.Errorw("failed-to-save-adapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
"currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint, "replica": adapter.CurrentReplica, "total": adapter.TotalReplicas})
return err
@@ -176,13 +178,13 @@
if saveToDb {
// Save the device types to the KV store
for _, deviceType := range deviceTypes.Items {
- if have, err := aMgr.clusterDataProxy.Get(context.Background(), "device_types/"+deviceType.Id, &voltha.DeviceType{}); err != nil {
+ if have, err := aMgr.deviceTypeProxy.Get(context.Background(), deviceType.Id, &voltha.DeviceType{}); err != nil {
logger.Errorw("Failed-to--device-types-from-cluster-data-proxy", log.Fields{"error": err})
return err
} else if !have {
// Does not exist - save it
clonedDType := (proto.Clone(deviceType)).(*voltha.DeviceType)
- if err := aMgr.clusterDataProxy.AddWithID(context.Background(), "device_types", deviceType.Id, clonedDType); err != nil {
+ if err := aMgr.deviceTypeProxy.Set(context.Background(), deviceType.Id, clonedDType); err != nil {
logger.Errorw("Failed-to-add-device-types-to-cluster-data-proxy", log.Fields{"error": err})
return err
}
diff --git a/rw_core/core/api/grpc_nbi_handler_test.go b/rw_core/core/api/grpc_nbi_handler_test.go
index 0c373ea..634a286 100755
--- a/rw_core/core/api/grpc_nbi_handler_test.go
+++ b/rw_core/core/api/grpc_nbi_handler_test.go
@@ -124,7 +124,7 @@
kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: cfg.AffinityRouterTopic}))
endpointMgr := kafka.NewEndpointManager(backend)
- proxy := model.NewProxy(backend, "/")
+ proxy := model.NewDBPath(backend)
nb.adapterMgr = adapter.NewAdapterManager(proxy, nb.coreInstanceID, nb.kClient)
nb.deviceMgr, nb.logicalDeviceMgr = device.NewManagers(proxy, nb.adapterMgr, nb.kmp, endpointMgr, cfg.CorePairTopic, nb.coreInstanceID, cfg.DefaultCoreTimeout)
nb.adapterMgr.Start(ctx)
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 0dbecc8..524b52c 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -120,11 +120,11 @@
)
// defer kafkaClient.Stop()
- // create kv proxy
- proxy := model.NewProxy(backend, "/")
+ // create kv path
+ dbPath := model.NewDBPath(backend)
// load adapters & device types while other things are starting
- adapterMgr := adapter.NewAdapterManager(proxy, id, kafkaClient)
+ adapterMgr := adapter.NewAdapterManager(dbPath, id, kafkaClient)
go adapterMgr.Start(ctx)
// connect to kafka, then wait until reachable and publisher/consumer created
@@ -139,7 +139,7 @@
// create the core of the system, the device managers
endpointMgr := kafka.NewEndpointManager(backend)
- deviceMgr, logicalDeviceMgr := device.NewManagers(proxy, adapterMgr, kmp, endpointMgr, cf.CorePairTopic, id, cf.DefaultCoreTimeout)
+ deviceMgr, logicalDeviceMgr := device.NewManagers(dbPath, adapterMgr, kmp, endpointMgr, cf.CorePairTopic, id, cf.DefaultCoreTimeout)
// register kafka RPC handler
registerAdapterRequestHandlers(kmp, deviceMgr, adapterMgr, cf.CoreTopic, cf.CorePairTopic)
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index c8f03e7..918432f 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -44,25 +44,25 @@
// Agent represents device agent attributes
type Agent struct {
- deviceID string
- parentID string
- deviceType string
- isRootdevice bool
- adapterProxy *remote.AdapterProxy
- adapterMgr *adapter.Manager
- deviceMgr *Manager
- clusterDataProxy *model.Proxy
- exitChannel chan int
- device *voltha.Device
- requestQueue *coreutils.RequestQueue
- defaultTimeout time.Duration
- startOnce sync.Once
- stopOnce sync.Once
- stopped bool
+ deviceID string
+ parentID string
+ deviceType string
+ isRootdevice bool
+ adapterProxy *remote.AdapterProxy
+ adapterMgr *adapter.Manager
+ deviceMgr *Manager
+ dbProxy *model.Proxy
+ exitChannel chan int
+ device *voltha.Device
+ requestQueue *coreutils.RequestQueue
+ defaultTimeout time.Duration
+ startOnce sync.Once
+ stopOnce sync.Once
+ stopped bool
}
//newAgent creates a new device agent. The device will be initialized when start() is called.
-func newAgent(ap *remote.AdapterProxy, device *voltha.Device, deviceMgr *Manager, cdProxy *model.Proxy, timeout time.Duration) *Agent {
+func newAgent(ap *remote.AdapterProxy, device *voltha.Device, deviceMgr *Manager, deviceProxy *model.Proxy, timeout time.Duration) *Agent {
var agent Agent
agent.adapterProxy = ap
if device.Id == "" {
@@ -77,7 +77,7 @@
agent.deviceMgr = deviceMgr
agent.adapterMgr = deviceMgr.adapterMgr
agent.exitChannel = make(chan int, 1)
- agent.clusterDataProxy = cdProxy
+ agent.dbProxy = deviceProxy
agent.defaultTimeout = timeout
agent.device = proto.Clone(device).(*voltha.Device)
agent.requestQueue = coreutils.NewRequestQueue()
@@ -105,7 +105,7 @@
if deviceToCreate == nil {
// Load the existing device
device := &voltha.Device{}
- have, err := agent.clusterDataProxy.Get(ctx, "devices/"+agent.deviceID, device)
+ have, err := agent.dbProxy.Get(ctx, agent.deviceID, device)
if err != nil {
return nil, err
} else if !have {
@@ -118,8 +118,8 @@
logger.Infow("device-loaded-from-dB", log.Fields{"device-id": agent.deviceID})
} else {
// Create a new device
- // Assumption is that AdminState, FlowGroups, and Flows are unitialized since this
- // is a new device, so populate them here before passing the device to clusterDataProxy.AddWithId.
+ // Assumption is that AdminState, FlowGroups, and Flows are uninitialized since this
+ // is a new device, so populate them here before passing the device to ldProxy.Set.
// agent.deviceId will also have been set during newAgent().
device = (proto.Clone(deviceToCreate)).(*voltha.Device)
device.Id = agent.deviceID
@@ -133,7 +133,7 @@
}
// Add the initial device to the local model
- if err := agent.clusterDataProxy.AddWithID(ctx, "devices", agent.deviceID, device); err != nil {
+ if err := agent.dbProxy.Set(ctx, agent.deviceID, device); err != nil {
return nil, status.Errorf(codes.Aborted, "failed-adding-device-%s: %s", agent.deviceID, err)
}
agent.device = device
@@ -159,7 +159,7 @@
logger.Infow("stopping-device-agent", log.Fields{"deviceId": agent.deviceID, "parentId": agent.parentID})
// Remove the device from the KV store
- if err := agent.clusterDataProxy.Remove(ctx, "devices/"+agent.deviceID); err != nil {
+ if err := agent.dbProxy.Remove(ctx, agent.deviceID); err != nil {
return err
}
@@ -182,7 +182,7 @@
logger.Debug("reconciling-device-agent-devicetype")
// TODO: context timeout
device := &voltha.Device{}
- if have, err := agent.clusterDataProxy.Get(ctx, "devices/"+agent.deviceID, device); err != nil {
+ if have, err := agent.dbProxy.Get(ctx, agent.deviceID, device); err != nil {
logger.Errorw("kv-get-failed", log.Fields{"device-id": agent.deviceID, "error": err})
return
} else if !have {
@@ -1553,7 +1553,7 @@
return errors.New("device agent stopped")
}
- if err := agent.clusterDataProxy.Update(ctx, "devices/"+agent.deviceID, device); err != nil {
+ if err := agent.dbProxy.Set(ctx, agent.deviceID, device); err != nil {
return status.Errorf(codes.Internal, "failed-update-device:%s: %s", agent.deviceID, err)
}
logger.Debugw("updated-device-in-store", log.Fields{"deviceId: ": agent.deviceID})
diff --git a/rw_core/core/device/agent_test.go b/rw_core/core/device/agent_test.go
index 2abfdeb..ffc2a3b 100755
--- a/rw_core/core/device/agent_test.go
+++ b/rw_core/core/device/agent_test.go
@@ -137,7 +137,7 @@
kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: cfg.AffinityRouterTopic}))
endpointMgr := kafka.NewEndpointManager(backend)
- proxy := model.NewProxy(backend, "/")
+ proxy := model.NewDBPath(backend)
adapterMgr := adapter.NewAdapterManager(proxy, dat.coreInstanceID, dat.kClient)
dat.deviceMgr, dat.logicalDeviceMgr = NewManagers(proxy, adapterMgr, dat.kmp, endpointMgr, cfg.CorePairTopic, dat.coreInstanceID, cfg.DefaultCoreTimeout)
@@ -194,7 +194,7 @@
func (dat *DATest) createDeviceAgent(t *testing.T) *Agent {
deviceMgr := dat.deviceMgr
clonedDevice := proto.Clone(dat.device).(*voltha.Device)
- deviceAgent := newAgent(deviceMgr.adapterProxy, clonedDevice, deviceMgr, deviceMgr.clusterDataProxy, deviceMgr.defaultTimeout)
+ deviceAgent := newAgent(deviceMgr.adapterProxy, clonedDevice, deviceMgr, deviceMgr.dProxy, deviceMgr.defaultTimeout)
d, err := deviceAgent.start(context.TODO(), clonedDevice)
assert.Nil(t, err)
assert.NotNil(t, d)
diff --git a/rw_core/core/device/flow/loader.go b/rw_core/core/device/flow/loader.go
index 741a45f..b407a3b 100644
--- a/rw_core/core/device/flow/loader.go
+++ b/rw_core/core/device/flow/loader.go
@@ -19,7 +19,6 @@
import (
"context"
"fmt"
- "strconv"
"sync"
"github.com/opencord/voltha-go/db/model"
@@ -35,8 +34,7 @@
lock sync.RWMutex
flows map[uint64]*chunk
- dbProxy *model.Proxy
- logicalDeviceID string // TODO: dbProxy should already have the logicalDeviceID component of the path internally
+ dbProxy *model.Proxy
}
// chunk keeps a flow and the lock for this flow
@@ -48,11 +46,10 @@
flow *ofp.OfpFlowStats
}
-func NewLoader(dataProxy *model.Proxy, logicalDeviceID string) *Loader {
+func NewLoader(dbProxy *model.Proxy) *Loader {
return &Loader{
- flows: make(map[uint64]*chunk),
- dbProxy: dataProxy,
- logicalDeviceID: logicalDeviceID,
+ flows: make(map[uint64]*chunk),
+ dbProxy: dbProxy,
}
}
@@ -63,7 +60,7 @@
defer loader.lock.Unlock()
var flows []*ofp.OfpFlowStats
- if err := loader.dbProxy.List(ctx, "logical_flows/"+loader.logicalDeviceID, &flows); err != nil {
+ if err := loader.dbProxy.List(ctx, &flows); err != nil {
logger.Errorw("failed-to-list-flows-from-cluster-data-proxy", log.Fields{"error": err})
return
}
@@ -88,9 +85,8 @@
entry.lock.Lock()
loader.lock.Unlock()
- flowID := strconv.FormatUint(uint64(flow.Id), 10)
- if err := loader.dbProxy.AddWithID(ctx, "logical_flows/"+loader.logicalDeviceID, flowID, flow); err != nil {
- logger.Errorw("failed-adding-flow-to-db", log.Fields{"deviceID": loader.logicalDeviceID, "flowID": flowID, "err": err})
+ if err := loader.dbProxy.Set(ctx, fmt.Sprint(flow.Id), flow); err != nil {
+ logger.Errorw("failed-adding-flow-to-db", log.Fields{"flowID": flow.Id, "err": err})
// revert the map
loader.lock.Lock()
@@ -147,9 +143,8 @@
// Update updates an existing flow in the kv.
// The provided "flow" must not be modified afterwards.
func (h *Handle) Update(ctx context.Context, flow *ofp.OfpFlowStats) error {
- path := fmt.Sprintf("logical_flows/%s/%d", h.loader.logicalDeviceID, flow.Id)
- if err := h.loader.dbProxy.Update(ctx, path, flow); err != nil {
- return status.Errorf(codes.Internal, "failed-update-flow:%s:%d %s", h.loader.logicalDeviceID, flow.Id, err)
+ if err := h.loader.dbProxy.Set(ctx, fmt.Sprint(flow.Id), flow); err != nil {
+ return status.Errorf(codes.Internal, "failed-update-flow-%v: %s", flow.Id, err)
}
h.chunk.flow = flow
return nil
@@ -157,9 +152,8 @@
// Delete removes the device from the kv
func (h *Handle) Delete(ctx context.Context) error {
- path := fmt.Sprintf("logical_flows/%s/%d", h.loader.logicalDeviceID, h.chunk.flow.Id)
- if err := h.loader.dbProxy.Remove(ctx, path); err != nil {
- return fmt.Errorf("couldnt-delete-flow-from-store-%s", path)
+ if err := h.loader.dbProxy.Remove(ctx, fmt.Sprint(h.chunk.flow.Id)); err != nil {
+ return fmt.Errorf("couldnt-delete-flow-from-store-%v", h.chunk.flow.Id)
}
h.chunk.deleted = true
diff --git a/rw_core/core/device/group/loader.go b/rw_core/core/device/group/loader.go
index 0e3f078..5b2890a 100644
--- a/rw_core/core/device/group/loader.go
+++ b/rw_core/core/device/group/loader.go
@@ -19,7 +19,6 @@
import (
"context"
"fmt"
- "strconv"
"sync"
"github.com/opencord/voltha-go/db/model"
@@ -35,8 +34,7 @@
lock sync.RWMutex
groups map[uint32]*chunk
- dbProxy *model.Proxy
- logicalDeviceID string // TODO: dbProxy should already have the logicalDeviceID component of the path internally
+ dbProxy *model.Proxy
}
// chunk keeps a group and the lock for this group
@@ -48,11 +46,10 @@
group *ofp.OfpGroupEntry
}
-func NewLoader(dataProxy *model.Proxy, logicalDeviceID string) *Loader {
+func NewLoader(dbProxy *model.Proxy) *Loader {
return &Loader{
- groups: make(map[uint32]*chunk),
- dbProxy: dataProxy,
- logicalDeviceID: logicalDeviceID,
+ groups: make(map[uint32]*chunk),
+ dbProxy: dbProxy,
}
}
@@ -63,7 +60,7 @@
defer loader.lock.Unlock()
var groups []*ofp.OfpGroupEntry
- if err := loader.dbProxy.List(ctx, "logical_groups/"+loader.logicalDeviceID, &groups); err != nil {
+ if err := loader.dbProxy.List(ctx, &groups); err != nil {
logger.Errorw("failed-to-list-groups-from-cluster-data-proxy", log.Fields{"error": err})
return
}
@@ -88,9 +85,8 @@
entry.lock.Lock()
loader.lock.Unlock()
- groupID := strconv.FormatUint(uint64(group.Desc.GroupId), 10)
- if err := loader.dbProxy.AddWithID(ctx, "logical_groups/"+loader.logicalDeviceID, groupID, group); err != nil {
- logger.Errorw("failed-adding-group-to-db", log.Fields{"deviceID": loader.logicalDeviceID, "groupID": groupID, "err": err})
+ if err := loader.dbProxy.Set(ctx, fmt.Sprint(group.Desc.GroupId), group); err != nil {
+ logger.Errorw("failed-adding-group-to-db", log.Fields{"groupID": group.Desc.GroupId, "err": err})
// revert the map
loader.lock.Lock()
@@ -147,9 +143,8 @@
// Update updates an existing group in the kv.
// The provided "group" must not be modified afterwards.
func (h *Handle) Update(ctx context.Context, group *ofp.OfpGroupEntry) error {
- path := fmt.Sprintf("logical_groups/%s/%d", h.loader.logicalDeviceID, group.Desc.GroupId)
- if err := h.loader.dbProxy.Update(ctx, path, group); err != nil {
- return status.Errorf(codes.Internal, "failed-update-group:%s:%d %s", h.loader.logicalDeviceID, group.Desc.GroupId, err)
+ if err := h.loader.dbProxy.Set(ctx, fmt.Sprint(group.Desc.GroupId), group); err != nil {
+ return status.Errorf(codes.Internal, "failed-update-group-%v: %s", group.Desc.GroupId, err)
}
h.chunk.group = group
return nil
@@ -157,9 +152,8 @@
// Delete removes the device from the kv
func (h *Handle) Delete(ctx context.Context) error {
- path := fmt.Sprintf("logical_groups/%s/%d", h.loader.logicalDeviceID, h.chunk.group.Desc.GroupId)
- if err := h.loader.dbProxy.Remove(ctx, path); err != nil {
- return fmt.Errorf("couldnt-delete-group-from-store-%s", path)
+ if err := h.loader.dbProxy.Remove(ctx, fmt.Sprint(h.chunk.group.Desc.GroupId)); err != nil {
+ return fmt.Errorf("couldnt-delete-group-from-store-%v", h.chunk.group.Desc.GroupId)
}
h.chunk.deleted = true
diff --git a/rw_core/core/device/logical_agent.go b/rw_core/core/device/logical_agent.go
index dbb2da5..f943106 100644
--- a/rw_core/core/device/logical_agent.go
+++ b/rw_core/core/device/logical_agent.go
@@ -47,7 +47,7 @@
rootDeviceID string
deviceMgr *Manager
ldeviceMgr *LogicalManager
- clusterDataProxy *model.Proxy
+ ldProxy *model.Proxy
stopped bool
deviceRoutes *route.DeviceRoutes
logicalPortsNo map[uint32]bool //value is true for NNI port
@@ -64,23 +64,23 @@
groupLoader *group.Loader
}
-func newLogicalDeviceAgent(id string, sn string, deviceID string, ldeviceMgr *LogicalManager,
- deviceMgr *Manager, cdProxy *model.Proxy, defaultTimeout time.Duration) *LogicalAgent {
+func newLogicalAgent(id string, sn string, deviceID string, ldeviceMgr *LogicalManager,
+ deviceMgr *Manager, dbProxy *model.Path, ldProxy *model.Proxy, defaultTimeout time.Duration) *LogicalAgent {
agent := &LogicalAgent{
- logicalDeviceID: id,
- serialNumber: sn,
- rootDeviceID: deviceID,
- deviceMgr: deviceMgr,
- clusterDataProxy: cdProxy,
- ldeviceMgr: ldeviceMgr,
- flowDecomposer: fd.NewFlowDecomposer(deviceMgr),
- logicalPortsNo: make(map[uint32]bool),
- defaultTimeout: defaultTimeout,
- requestQueue: coreutils.NewRequestQueue(),
+ logicalDeviceID: id,
+ serialNumber: sn,
+ rootDeviceID: deviceID,
+ deviceMgr: deviceMgr,
+ ldProxy: ldProxy,
+ ldeviceMgr: ldeviceMgr,
+ flowDecomposer: fd.NewFlowDecomposer(deviceMgr),
+ logicalPortsNo: make(map[uint32]bool),
+ defaultTimeout: defaultTimeout,
+ requestQueue: coreutils.NewRequestQueue(),
- flowLoader: flow.NewLoader(cdProxy, id),
- meterLoader: meter.NewLoader(cdProxy, id),
- groupLoader: group.NewLoader(cdProxy, id),
+ flowLoader: flow.NewLoader(dbProxy.SubPath("logical_flows").Proxy(id)),
+ groupLoader: group.NewLoader(dbProxy.SubPath("logical_groups").Proxy(id)),
+ meterLoader: meter.NewLoader(dbProxy.SubPath("logical_meters").Proxy(id)),
}
agent.deviceRoutes = route.NewDeviceRoutes(agent.logicalDeviceID, agent.deviceMgr.getDevice)
return agent
@@ -128,7 +128,7 @@
ld.Ports = []*voltha.LogicalPort{}
// Save the logical device
- if err := agent.clusterDataProxy.AddWithID(ctx, "logical_devices", ld.Id, ld); err != nil {
+ if err := agent.ldProxy.Set(ctx, ld.Id, ld); err != nil {
logger.Errorw("failed-to-add-logical-device", log.Fields{"logical-device-id": agent.logicalDeviceID})
return err
}
@@ -147,7 +147,7 @@
// load from dB - the logical may not exist at this time. On error, just return and the calling function
// will destroy this agent.
ld := &voltha.LogicalDevice{}
- have, err := agent.clusterDataProxy.Get(ctx, "logical_devices/"+agent.logicalDeviceID, ld)
+ have, err := agent.ldProxy.Get(ctx, agent.logicalDeviceID, ld)
if err != nil {
return err
} else if !have {
@@ -195,7 +195,7 @@
defer agent.requestQueue.RequestComplete()
//Remove the logical device from the model
- if err := agent.clusterDataProxy.Remove(ctx, "logical_devices/"+agent.logicalDeviceID); err != nil {
+ if err := agent.ldProxy.Remove(ctx, agent.logicalDeviceID); err != nil {
returnErr = err
} else {
logger.Debugw("logicaldevice-removed", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
@@ -230,7 +230,7 @@
}
updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
- if err := agent.clusterDataProxy.Update(updateCtx, "logical_devices/"+agent.logicalDeviceID, logicalDevice); err != nil {
+ if err := agent.ldProxy.Set(updateCtx, agent.logicalDeviceID, logicalDevice); err != nil {
logger.Errorw("failed-to-update-logical-devices-to-cluster-proxy", log.Fields{"error": err})
return err
}
diff --git a/rw_core/core/device/logical_agent_test.go b/rw_core/core/device/logical_agent_test.go
index f5d404e..eb65673 100644
--- a/rw_core/core/device/logical_agent_test.go
+++ b/rw_core/core/device/logical_agent_test.go
@@ -22,14 +22,13 @@
"testing"
"time"
+ "github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/db/model"
+ "github.com/opencord/voltha-go/rw_core/config"
"github.com/opencord/voltha-go/rw_core/core/adapter"
+ com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
"github.com/opencord/voltha-lib-go/v3/pkg/db"
fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
-
- "github.com/gogo/protobuf/proto"
- "github.com/opencord/voltha-go/rw_core/config"
- com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
mock_etcd "github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd"
mock_kafka "github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka"
@@ -481,7 +480,7 @@
kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: cfg.AffinityRouterTopic}))
endpointMgr := kafka.NewEndpointManager(backend)
- proxy := model.NewProxy(backend, "/")
+ proxy := model.NewDBPath(backend)
adapterMgr := adapter.NewAdapterManager(proxy, lda.coreInstanceID, lda.kClient)
lda.deviceMgr, lda.logicalDeviceMgr = NewManagers(proxy, adapterMgr, lda.kmp, endpointMgr, cfg.CorePairTopic, lda.coreInstanceID, cfg.DefaultCoreTimeout)
@@ -509,9 +508,9 @@
clonedLD := proto.Clone(lda.logicalDevice).(*voltha.LogicalDevice)
clonedLD.Id = com.GetRandomString(10)
clonedLD.DatapathId = rand.Uint64()
- lDeviceAgent := newLogicalDeviceAgent(clonedLD.Id, clonedLD.Id, clonedLD.RootDeviceId, lDeviceMgr, deviceMgr, lDeviceMgr.clusterDataProxy, lDeviceMgr.defaultTimeout)
+ lDeviceAgent := newLogicalAgent(clonedLD.Id, clonedLD.Id, clonedLD.RootDeviceId, lDeviceMgr, deviceMgr, lDeviceMgr.dbProxy, lDeviceMgr.ldProxy, lDeviceMgr.defaultTimeout)
lDeviceAgent.logicalDevice = clonedLD
- err := lDeviceAgent.clusterDataProxy.AddWithID(context.Background(), "logical_devices", clonedLD.Id, clonedLD)
+ err := lDeviceAgent.ldProxy.Set(context.Background(), clonedLD.Id, clonedLD)
assert.Nil(t, err)
lDeviceMgr.addLogicalDeviceAgentToMap(lDeviceAgent)
return lDeviceAgent
diff --git a/rw_core/core/device/logical_manager.go b/rw_core/core/device/logical_manager.go
index cd7bce1..f9bff21 100644
--- a/rw_core/core/device/logical_manager.go
+++ b/rw_core/core/device/logical_manager.go
@@ -42,7 +42,8 @@
logicalDeviceAgents sync.Map
deviceMgr *Manager
kafkaICProxy kafka.InterContainerProxy
- clusterDataProxy *model.Proxy
+ dbProxy *model.Path
+ ldProxy *model.Proxy
defaultTimeout time.Duration
logicalDevicesLoadingLock sync.RWMutex
logicalDeviceLoadingInProgress map[string][]chan int
@@ -98,7 +99,7 @@
logger.Debug("ListAllLogicalDevices")
var logicalDevices []*voltha.LogicalDevice
- if err := ldMgr.clusterDataProxy.List(ctx, "logical_devices", &logicalDevices); err != nil {
+ if err := ldMgr.ldProxy.List(ctx, &logicalDevices); err != nil {
logger.Errorw("failed-to-list-logical-devices-from-cluster-proxy", log.Fields{"error": err})
return nil, err
}
@@ -125,7 +126,7 @@
logger.Debugw("logical-device-id", log.Fields{"logicaldeviceId": id})
- agent := newLogicalDeviceAgent(id, sn, device.Id, ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy, ldMgr.defaultTimeout)
+ agent := newLogicalAgent(id, sn, device.Id, ldMgr, ldMgr.deviceMgr, ldMgr.dbProxy, ldMgr.ldProxy, ldMgr.defaultTimeout)
ldMgr.addLogicalDeviceAgentToMap(agent)
// Update the root device with the logical device Id reference
@@ -135,7 +136,8 @@
}
go func() {
- //agent := newLogicalDeviceAgent(id, device.Id, ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy, ldMgr.defaultTimeout)
+ //TODO: either wait for the agent to be started before returning, or
+ // implement locks in the agent to ensure request are not processed before start() is complete
err := agent.start(context.Background(), false)
if err != nil {
logger.Errorw("unable-to-create-the-logical-device", log.Fields{"error": err})
@@ -173,7 +175,7 @@
//getLogicalDeviceFromModel retrieves the logical device data from the model.
func (ldMgr *LogicalManager) getLogicalDeviceFromModel(ctx context.Context, lDeviceID string) (*voltha.LogicalDevice, error) {
logicalDevice := &voltha.LogicalDevice{}
- if have, err := ldMgr.clusterDataProxy.Get(ctx, "logical_devices/"+lDeviceID, logicalDevice); err != nil {
+ if have, err := ldMgr.ldProxy.Get(ctx, lDeviceID, logicalDevice); err != nil {
logger.Errorw("failed-to-get-logical-devices-from-cluster-proxy", log.Fields{"error": err})
return nil, err
} else if !have {
@@ -196,7 +198,7 @@
ldMgr.logicalDevicesLoadingLock.Unlock()
if _, err := ldMgr.getLogicalDeviceFromModel(ctx, lDeviceID); err == nil {
logger.Debugw("loading-logical-device", log.Fields{"lDeviceId": lDeviceID})
- agent := newLogicalDeviceAgent(lDeviceID, "", "", ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy, ldMgr.defaultTimeout)
+ agent := newLogicalAgent(lDeviceID, "", "", ldMgr, ldMgr.deviceMgr, ldMgr.dbProxy, ldMgr.ldProxy, ldMgr.defaultTimeout)
if err := agent.start(ctx, true); err != nil {
return err
}
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index cf1301f..357c49a 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -19,7 +19,6 @@
import (
"context"
"errors"
- "github.com/opencord/voltha-go/rw_core/core/device/event"
"reflect"
"runtime"
"sync"
@@ -28,6 +27,7 @@
"github.com/golang/protobuf/ptypes/empty"
"github.com/opencord/voltha-go/db/model"
"github.com/opencord/voltha-go/rw_core/core/adapter"
+ "github.com/opencord/voltha-go/rw_core/core/device/event"
"github.com/opencord/voltha-go/rw_core/core/device/remote"
"github.com/opencord/voltha-go/rw_core/utils"
"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
@@ -50,20 +50,20 @@
logicalDeviceMgr *LogicalManager
kafkaICProxy kafka.InterContainerProxy
stateTransitions *TransitionMap
- clusterDataProxy *model.Proxy
+ dProxy *model.Proxy
coreInstanceID string
defaultTimeout time.Duration
devicesLoadingLock sync.RWMutex
deviceLoadingInProgress map[string][]chan int
}
-func NewManagers(proxy *model.Proxy, adapterMgr *adapter.Manager, kmp kafka.InterContainerProxy, endpointMgr kafka.EndpointManager, corePairTopic, coreInstanceID string, defaultCoreTimeout time.Duration) (*Manager, *LogicalManager) {
+func NewManagers(dbProxy *model.Path, adapterMgr *adapter.Manager, kmp kafka.InterContainerProxy, endpointMgr kafka.EndpointManager, corePairTopic, coreInstanceID string, defaultCoreTimeout time.Duration) (*Manager, *LogicalManager) {
deviceMgr := &Manager{
rootDevices: make(map[string]bool),
kafkaICProxy: kmp,
adapterProxy: remote.NewAdapterProxy(kmp, corePairTopic, endpointMgr),
coreInstanceID: coreInstanceID,
- clusterDataProxy: proxy,
+ dProxy: dbProxy.Proxy("devices"),
adapterMgr: adapterMgr,
defaultTimeout: defaultCoreTimeout * time.Millisecond,
deviceLoadingInProgress: make(map[string][]chan int),
@@ -74,7 +74,8 @@
Manager: event.NewManager(),
deviceMgr: deviceMgr,
kafkaICProxy: kmp,
- clusterDataProxy: proxy,
+ dbProxy: dbProxy,
+ ldProxy: dbProxy.Proxy("logical_devices"),
defaultTimeout: defaultCoreTimeout,
logicalDeviceLoadingInProgress: make(map[string][]chan int),
}
@@ -156,7 +157,7 @@
// Ensure this device is set as root
device.Root = true
// Create and start a device agent for that device
- agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
+ agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dProxy, dMgr.defaultTimeout)
device, err = agent.start(ctx, device)
if err != nil {
logger.Errorw("Fail-to-start-device", log.Fields{"device-id": agent.deviceID, "error": err})
@@ -398,7 +399,7 @@
result := &voltha.Devices{}
var devices []*voltha.Device
- if err := dMgr.clusterDataProxy.List(ctx, "devices", &devices); err != nil {
+ if err := dMgr.dProxy.List(ctx, &devices); err != nil {
logger.Errorw("failed-to-list-devices-from-cluster-proxy", log.Fields{"error": err})
return nil, err
}
@@ -407,7 +408,7 @@
// If device is not in memory then set it up
if !dMgr.IsDeviceInCache(device.Id) {
logger.Debugw("loading-device-from-Model", log.Fields{"id": device.Id})
- agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
+ agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dProxy, dMgr.defaultTimeout)
if _, err := agent.start(ctx, nil); err != nil {
logger.Warnw("failure-starting-agent", log.Fields{"deviceId": device.Id})
} else {
@@ -424,7 +425,7 @@
func (dMgr *Manager) isParentDeviceExist(ctx context.Context, newDevice *voltha.Device) (bool, error) {
hostPort := newDevice.GetHostAndPort()
var devices []*voltha.Device
- if err := dMgr.clusterDataProxy.List(ctx, "devices", &devices); err != nil {
+ if err := dMgr.dProxy.List(ctx, &devices); err != nil {
logger.Errorw("Failed to list devices from cluster data proxy", log.Fields{"error": err})
return false, err
}
@@ -445,7 +446,7 @@
//getDeviceFromModelretrieves the device data from the model.
func (dMgr *Manager) getDeviceFromModel(ctx context.Context, deviceID string) (*voltha.Device, error) {
device := &voltha.Device{}
- if have, err := dMgr.clusterDataProxy.Get(ctx, "devices/"+deviceID, device); err != nil {
+ if have, err := dMgr.dProxy.Get(ctx, deviceID, device); err != nil {
logger.Errorw("failed-to-get-device-info-from-cluster-proxy", log.Fields{"error": err})
return nil, err
} else if !have {
@@ -470,7 +471,7 @@
// Proceed with the loading only if the device exist in the Model (could have been deleted)
if device, err = dMgr.getDeviceFromModel(ctx, deviceID); err == nil {
logger.Debugw("loading-device", log.Fields{"deviceId": deviceID})
- agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
+ agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dProxy, dMgr.defaultTimeout)
if _, err = agent.start(ctx, nil); err != nil {
logger.Warnw("Failure loading device", log.Fields{"deviceId": deviceID, "error": err})
} else {
@@ -1028,7 +1029,7 @@
childDevice.ProxyAddress = &voltha.Device_ProxyAddress{DeviceId: parentDeviceID, DeviceType: pAgent.deviceType, ChannelId: uint32(channelID), OnuId: uint32(onuID)}
// Create and start a device agent for that device
- agent := newAgent(dMgr.adapterProxy, childDevice, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
+ agent := newAgent(dMgr.adapterProxy, childDevice, dMgr, dMgr.dProxy, dMgr.defaultTimeout)
childDevice, err := agent.start(ctx, childDevice)
if err != nil {
logger.Errorw("error-starting-child-device", log.Fields{"parent-device-id": childDevice.ParentId, "child-device-id": agent.deviceID, "error": err})
diff --git a/rw_core/core/device/meter/loader.go b/rw_core/core/device/meter/loader.go
index ae6c957..daae9ae 100644
--- a/rw_core/core/device/meter/loader.go
+++ b/rw_core/core/device/meter/loader.go
@@ -19,7 +19,6 @@
import (
"context"
"fmt"
- "strconv"
"sync"
"github.com/opencord/voltha-go/db/model"
@@ -35,8 +34,7 @@
lock sync.RWMutex
meters map[uint32]*chunk
- dbProxy *model.Proxy
- logicalDeviceID string // TODO: dbProxy should already have the logicalDeviceID component of the path internally
+ dbProxy *model.Proxy
}
// chunk keeps a meter and the lock for this meter
@@ -48,11 +46,10 @@
meter *ofp.OfpMeterEntry
}
-func NewLoader(dataProxy *model.Proxy, logicalDeviceID string) *Loader {
+func NewLoader(dbProxy *model.Proxy) *Loader {
return &Loader{
- meters: make(map[uint32]*chunk),
- dbProxy: dataProxy,
- logicalDeviceID: logicalDeviceID,
+ meters: make(map[uint32]*chunk),
+ dbProxy: dbProxy,
}
}
@@ -63,7 +60,7 @@
defer loader.lock.Unlock()
var meters []*ofp.OfpMeterEntry
- if err := loader.dbProxy.List(ctx, "logical_meters/"+loader.logicalDeviceID, &meters); err != nil {
+ if err := loader.dbProxy.List(ctx, &meters); err != nil {
logger.Errorw("failed-to-list-meters-from-cluster-data-proxy", log.Fields{"error": err})
return
}
@@ -88,9 +85,8 @@
entry.lock.Lock()
loader.lock.Unlock()
- meterID := strconv.FormatUint(uint64(meter.Config.MeterId), 10)
- if err := loader.dbProxy.AddWithID(ctx, "logical_meters/"+loader.logicalDeviceID, meterID, meter); err != nil {
- logger.Errorw("failed-adding-meter-to-db", log.Fields{"deviceID": loader.logicalDeviceID, "meterID": meterID, "err": err})
+ if err := loader.dbProxy.Set(ctx, fmt.Sprint(meter.Config.MeterId), meter); err != nil {
+ logger.Errorw("failed-adding-meter-to-db", log.Fields{"meterID": meter.Config.MeterId, "err": err})
// revert the map
loader.lock.Lock()
@@ -147,9 +143,8 @@
// Update updates an existing meter in the kv.
// The provided "meter" must not be modified afterwards.
func (h *Handle) Update(ctx context.Context, meter *ofp.OfpMeterEntry) error {
- path := fmt.Sprintf("logical_meters/%s/%d", h.loader.logicalDeviceID, meter.Config.MeterId)
- if err := h.loader.dbProxy.Update(ctx, path, meter); err != nil {
- return status.Errorf(codes.Internal, "failed-update-meter:%s:%d %s", h.loader.logicalDeviceID, meter.Config.MeterId, err)
+ if err := h.loader.dbProxy.Set(ctx, fmt.Sprint(meter.Config.MeterId), meter); err != nil {
+ return status.Errorf(codes.Internal, "failed-update-meter-%v: %s", meter.Config.MeterId, err)
}
h.chunk.meter = meter
return nil
@@ -157,9 +152,8 @@
// Delete removes the device from the kv
func (h *Handle) Delete(ctx context.Context) error {
- path := fmt.Sprintf("logical_meters/%s/%d", h.loader.logicalDeviceID, h.chunk.meter.Config.MeterId)
- if err := h.loader.dbProxy.Remove(ctx, path); err != nil {
- return fmt.Errorf("couldnt-delete-meter-from-store-%s", path)
+ if err := h.loader.dbProxy.Remove(ctx, fmt.Sprint(h.chunk.meter.Config.MeterId)); err != nil {
+ return fmt.Errorf("couldnt-delete-meter-from-store-%v", h.chunk.meter.Config.MeterId)
}
h.chunk.deleted = true