VOL-2999 - Reworked how Proxies are created & used.
Added DB Paths to separate location specification logic from entry access logic.
Also merged Update() and AddWithID() and renamed to Set().
Change-Id: I9ed5eafd63c180dddc5845a166554f89bda12325
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index cf1301f..357c49a 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -19,7 +19,6 @@
import (
"context"
"errors"
- "github.com/opencord/voltha-go/rw_core/core/device/event"
"reflect"
"runtime"
"sync"
@@ -28,6 +27,7 @@
"github.com/golang/protobuf/ptypes/empty"
"github.com/opencord/voltha-go/db/model"
"github.com/opencord/voltha-go/rw_core/core/adapter"
+ "github.com/opencord/voltha-go/rw_core/core/device/event"
"github.com/opencord/voltha-go/rw_core/core/device/remote"
"github.com/opencord/voltha-go/rw_core/utils"
"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
@@ -50,20 +50,20 @@
logicalDeviceMgr *LogicalManager
kafkaICProxy kafka.InterContainerProxy
stateTransitions *TransitionMap
- clusterDataProxy *model.Proxy
+ dProxy *model.Proxy
coreInstanceID string
defaultTimeout time.Duration
devicesLoadingLock sync.RWMutex
deviceLoadingInProgress map[string][]chan int
}
-func NewManagers(proxy *model.Proxy, adapterMgr *adapter.Manager, kmp kafka.InterContainerProxy, endpointMgr kafka.EndpointManager, corePairTopic, coreInstanceID string, defaultCoreTimeout time.Duration) (*Manager, *LogicalManager) {
+func NewManagers(dbProxy *model.Path, adapterMgr *adapter.Manager, kmp kafka.InterContainerProxy, endpointMgr kafka.EndpointManager, corePairTopic, coreInstanceID string, defaultCoreTimeout time.Duration) (*Manager, *LogicalManager) {
deviceMgr := &Manager{
rootDevices: make(map[string]bool),
kafkaICProxy: kmp,
adapterProxy: remote.NewAdapterProxy(kmp, corePairTopic, endpointMgr),
coreInstanceID: coreInstanceID,
- clusterDataProxy: proxy,
+ dProxy: dbProxy.Proxy("devices"),
adapterMgr: adapterMgr,
defaultTimeout: defaultCoreTimeout * time.Millisecond,
deviceLoadingInProgress: make(map[string][]chan int),
@@ -74,7 +74,8 @@
Manager: event.NewManager(),
deviceMgr: deviceMgr,
kafkaICProxy: kmp,
- clusterDataProxy: proxy,
+ dbProxy: dbProxy,
+ ldProxy: dbProxy.Proxy("logical_devices"),
defaultTimeout: defaultCoreTimeout,
logicalDeviceLoadingInProgress: make(map[string][]chan int),
}
@@ -156,7 +157,7 @@
// Ensure this device is set as root
device.Root = true
// Create and start a device agent for that device
- agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
+ agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dProxy, dMgr.defaultTimeout)
device, err = agent.start(ctx, device)
if err != nil {
logger.Errorw("Fail-to-start-device", log.Fields{"device-id": agent.deviceID, "error": err})
@@ -398,7 +399,7 @@
result := &voltha.Devices{}
var devices []*voltha.Device
- if err := dMgr.clusterDataProxy.List(ctx, "devices", &devices); err != nil {
+ if err := dMgr.dProxy.List(ctx, &devices); err != nil {
logger.Errorw("failed-to-list-devices-from-cluster-proxy", log.Fields{"error": err})
return nil, err
}
@@ -407,7 +408,7 @@
// If device is not in memory then set it up
if !dMgr.IsDeviceInCache(device.Id) {
logger.Debugw("loading-device-from-Model", log.Fields{"id": device.Id})
- agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
+ agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dProxy, dMgr.defaultTimeout)
if _, err := agent.start(ctx, nil); err != nil {
logger.Warnw("failure-starting-agent", log.Fields{"deviceId": device.Id})
} else {
@@ -424,7 +425,7 @@
func (dMgr *Manager) isParentDeviceExist(ctx context.Context, newDevice *voltha.Device) (bool, error) {
hostPort := newDevice.GetHostAndPort()
var devices []*voltha.Device
- if err := dMgr.clusterDataProxy.List(ctx, "devices", &devices); err != nil {
+ if err := dMgr.dProxy.List(ctx, &devices); err != nil {
logger.Errorw("Failed to list devices from cluster data proxy", log.Fields{"error": err})
return false, err
}
@@ -445,7 +446,7 @@
//getDeviceFromModelretrieves the device data from the model.
func (dMgr *Manager) getDeviceFromModel(ctx context.Context, deviceID string) (*voltha.Device, error) {
device := &voltha.Device{}
- if have, err := dMgr.clusterDataProxy.Get(ctx, "devices/"+deviceID, device); err != nil {
+ if have, err := dMgr.dProxy.Get(ctx, deviceID, device); err != nil {
logger.Errorw("failed-to-get-device-info-from-cluster-proxy", log.Fields{"error": err})
return nil, err
} else if !have {
@@ -470,7 +471,7 @@
// Proceed with the loading only if the device exist in the Model (could have been deleted)
if device, err = dMgr.getDeviceFromModel(ctx, deviceID); err == nil {
logger.Debugw("loading-device", log.Fields{"deviceId": deviceID})
- agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
+ agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dProxy, dMgr.defaultTimeout)
if _, err = agent.start(ctx, nil); err != nil {
logger.Warnw("Failure loading device", log.Fields{"deviceId": deviceID, "error": err})
} else {
@@ -1028,7 +1029,7 @@
childDevice.ProxyAddress = &voltha.Device_ProxyAddress{DeviceId: parentDeviceID, DeviceType: pAgent.deviceType, ChannelId: uint32(channelID), OnuId: uint32(onuID)}
// Create and start a device agent for that device
- agent := newAgent(dMgr.adapterProxy, childDevice, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
+ agent := newAgent(dMgr.adapterProxy, childDevice, dMgr, dMgr.dProxy, dMgr.defaultTimeout)
childDevice, err := agent.start(ctx, childDevice)
if err != nil {
logger.Errorw("error-starting-child-device", log.Fields{"parent-device-id": childDevice.ParentId, "child-device-id": agent.deviceID, "error": err})